You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/24 04:47:32 UTC

git commit: Merge pull request #503 from pwendell/master

Updated Branches:
  refs/heads/branch-0.9 e8d3f2b2f -> e66d4c27c


Merge pull request #503 from pwendell/master

Fix bug on read-side of external sort when using Snappy.

This case wasn't handled correctly and this patch fixes it.
(cherry picked from commit 3d6e75419330d27435becfdf8cfb0b6d20d56cf8)

Signed-off-by: Patrick Wendell <pw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e66d4c27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e66d4c27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e66d4c27

Branch: refs/heads/branch-0.9
Commit: e66d4c27cadccaa8bb8b2a9ab486889ce2de37d0
Parents: e8d3f2b
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 19:47:00 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 19:47:16 2014 -0800

----------------------------------------------------------------------
 .../spark/util/collection/ExternalAppendOnlyMap.scala     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e66d4c27/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fb73636..3d9b09e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
     val fileStream = new FileInputStream(file)
     val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
-    val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+    val shouldCompress = blockManager.shouldCompress(blockId)
+    val compressionCodec = new LZFCompressionCodec(sparkConf)
+    val compressedStream =
+      if (shouldCompress) {
+        compressionCodec.compressedInputStream(bufferedStream)
+      } else {
+        bufferedStream
+      }
     var deserializeStream = ser.deserializeStream(compressedStream)
     var objectsRead = 0