You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/21 09:09:53 UTC

[04/10] git commit: Force use of LZF when spilling data

Force use of LZF when spilling data


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c324ac10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c324ac10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c324ac10

Branch: refs/heads/master
Commit: c324ac10ee208c53dabd54e5c0e1885aea456811
Parents: 1b29914
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 20 19:00:48 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 20 19:00:48 2014 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 42 +++++++++++++++++---
 docs/configuration.md                           |  4 +-
 2 files changed, 39 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 8df8b4f..792f29d 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
 import java.io._
 import java.util.Comparator
 
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
 import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
 
 /**
  * An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,38 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
     val (blockId, file) = diskBlockManager.createTempBlock()
 
-    val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+    /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+    *  closes and re-opens serialization and compression streams within each file. This makes some
+     * assumptions about the way that serialization and compression streams work, specifically:
+     *
+     * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+     *
+     * 2) Several compression streams can be opened, written to, and flushed on the write path
+     *    while only one compression input stream is created on the read path
+     *
+     * In practice (1) is only true for Java, so we add a special fix below to make it work for
+     * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+     *
+     * To avoid making these assumptions we should create an intermediate stream that batches
+     * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+     * This is a bit tricky because, within each segment, you'd need to track the total number
+     * of bytes written and then re-wind and write it at the beginning of the segment. This will
+     * most likely require using the file channel API.
+     */
+
+    val codec = new LZFCompressionCodec(sparkConf)
+
+    def wrapForCompression(outputStream: OutputStream) = {
+      blockManager.shouldCompress(blockId) match {
+        case true =>
+          codec.compressedOutputStream(outputStream)
+        case false =>
+          outputStream
+      }
+    }
+
     def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
-      compressStream, syncWrites)
+      wrapForCompression, syncWrites)
 
     var writer = getNewWriter
     var objectsWritten = 0

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c324ac10/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 4c2e9cc..be548e3 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
   <td>spark.shuffle.spill.compress</td>
   <td>true</td>
   <td>
-    Whether to compress data spilled during shuffles.
+    Whether to compress data spilled during shuffles. If enabled, spill compression
+    always uses the `org.apache.spark.io.LZFCompressionCodec` codec, 
+    regardless of the value of `spark.io.compression.codec`.
   </td>
 </tr>
 <tr>