You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/24 04:47:32 UTC
git commit: Merge pull request #503 from pwendell/master
Updated Branches:
refs/heads/branch-0.9 e8d3f2b2f -> e66d4c27c
Merge pull request #503 from pwendell/master
Fix bug on read-side of external sort when using Snappy.
This case wasn't handled correctly and this patch fixes it.
(cherry picked from commit 3d6e75419330d27435becfdf8cfb0b6d20d56cf8)
Signed-off-by: Patrick Wendell <pw...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e66d4c27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e66d4c27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e66d4c27
Branch: refs/heads/branch-0.9
Commit: e66d4c27cadccaa8bb8b2a9ab486889ce2de37d0
Parents: e8d3f2b
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 23 19:47:00 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 23 19:47:16 2014 -0800
----------------------------------------------------------------------
.../spark/util/collection/ExternalAppendOnlyMap.scala | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e66d4c27/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index fb73636..3d9b09e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -358,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
- val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ val compressedStream =
+ if (shouldCompress) {
+ compressionCodec.compressedInputStream(bufferedStream)
+ } else {
+ bufferedStream
+ }
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0