You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/22 01:24:18 UTC

spark git commit: [SPARK-3386] Share and reuse SerializerInstances in shuffle paths

Repository: spark
Updated Branches:
  refs/heads/master 7662ec23b -> f83c0f112


[SPARK-3386] Share and reuse SerializerInstances in shuffle paths

This patch modifies several shuffle-related code paths to share and re-use SerializerInstances instead of creating new ones.  Some serializers, such as KryoSerializer or SqlSerializer, can be fairly expensive to create or may consume moderate amounts of memory, so it's probably best to avoid unnecessary serializer creation in hot code paths.

The key change in this patch is modifying `getDiskWriter()` / `DiskBlockObjectWriter` to accept `SerializerInstance`s instead of `Serializer`s (which are factories for instances).  This allows the disk writer's creator to decide whether the serializer instance can be shared or re-used.

The rest of the patch modifies several write and read paths to use shared serializers.  One big win is in `ShuffleBlockFetcherIterator`, where we used to create a new serializer per received block.  Similarly, the shuffle write path used to create a new serializer per file even though in many cases only a single thread would be writing to a file at a time.

I made a small serializer reuse optimization in CoarseGrainedExecutorBackend as well, since it seemed like a small and obvious improvement.

Author: Josh Rosen <jo...@databricks.com>

Closes #5606 from JoshRosen/SPARK-3386 and squashes the following commits:

f661ce7 [Josh Rosen] Remove thread local; add comment instead
64f8398 [Josh Rosen] Use ThreadLocal for serializer instance in CoarseGrainedExecutorBackend
aeb680e [Josh Rosen] [SPARK-3386] Reuse SerializerInstance in shuffle code paths


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f83c0f11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f83c0f11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f83c0f11

Branch: refs/heads/master
Commit: f83c0f112d04173f4fc2c5eaf0f9cb11d9180077
Parents: 7662ec2
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Apr 21 16:24:15 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Apr 21 16:24:15 2015 -0700

----------------------------------------------------------------------
 .../spark/executor/CoarseGrainedExecutorBackend.scala |  6 +++++-
 .../spark/shuffle/FileShuffleBlockManager.scala       |  6 ++++--
 .../scala/org/apache/spark/storage/BlockManager.scala |  8 ++++----
 .../org/apache/spark/storage/BlockObjectWriter.scala  |  6 +++---
 .../spark/storage/ShuffleBlockFetcherIterator.scala   |  6 ++++--
 .../spark/util/collection/ExternalAppendOnlyMap.scala |  6 ++----
 .../apache/spark/util/collection/ExternalSorter.scala | 14 +++++++++-----
 .../apache/spark/storage/BlockObjectWriterSuite.scala |  6 +++---
 8 files changed, 34 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 8300f9f..8af46f3 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
@@ -47,6 +48,10 @@ private[spark] class CoarseGrainedExecutorBackend(
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
+  // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
+  // to be changed so that we don't share the serializer instance across threads
+  private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
+
   override def onStart() {
     import scala.concurrent.ExecutionContext.Implicits.global
     logInfo("Connecting to driver: " + driverUrl)
@@ -83,7 +88,6 @@ private[spark] class CoarseGrainedExecutorBackend(
         logError("Received LaunchTask command but executor was null")
         System.exit(1)
       } else {
-        val ser = env.closureSerializer.newInstance()
         val taskDesc = ser.deserialize[TaskDescription](data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
         executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 5be3ed7..538e150 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -113,11 +113,12 @@ class FileShuffleBlockManager(conf: SparkConf)
       private var fileGroup: ShuffleFileGroup = null
 
       val openStartTime = System.nanoTime
+      val serializerInstance = serializer.newInstance()
       val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
         fileGroup = getUnusedFileGroup()
         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
-          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
+          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializerInstance, bufferSize,
             writeMetrics)
         }
       } else {
@@ -133,7 +134,8 @@ class FileShuffleBlockManager(conf: SparkConf)
               logWarning(s"Failed to remove existing shuffle file $blockFile")
             }
           }
-          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
+          blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
+            writeMetrics)
         }
       }
       // Creating the file to write to and creating a disk writer both involve interacting with

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1aa0ef1..145a9c1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -37,7 +37,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.ExternalShuffleClient
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{SerializerInstance, Serializer}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.util._
@@ -646,13 +646,13 @@ private[spark] class BlockManager(
   def getDiskWriter(
       blockId: BlockId,
       file: File,
-      serializer: Serializer,
+      serializerInstance: SerializerInstance,
       bufferSize: Int,
       writeMetrics: ShuffleWriteMetrics): BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
     val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
-    new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites,
-      writeMetrics)
+    new DiskBlockObjectWriter(blockId, file, serializerInstance, bufferSize, compressStream,
+      syncWrites, writeMetrics)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 0dfc91d..1483379 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -21,7 +21,7 @@ import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
 import java.nio.channels.FileChannel
 
 import org.apache.spark.Logging
-import org.apache.spark.serializer.{SerializationStream, Serializer}
+import org.apache.spark.serializer.{SerializerInstance, SerializationStream}
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.util.Utils
 
@@ -71,7 +71,7 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 private[spark] class DiskBlockObjectWriter(
     blockId: BlockId,
     file: File,
-    serializer: Serializer,
+    serializerInstance: SerializerInstance,
     bufferSize: Int,
     compressStream: OutputStream => OutputStream,
     syncWrites: Boolean,
@@ -134,7 +134,7 @@ private[spark] class DiskBlockObjectWriter(
     ts = new TimeTrackingOutputStream(fos)
     channel = fos.getChannel()
     bs = compressStream(new BufferedOutputStream(ts, bufferSize))
-    objOut = serializer.newInstance().serializeStream(bs)
+    objOut = serializerInstance.serializeStream(bs)
     initialized = true
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 8f28ef4..f337952 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, TaskContext}
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
 import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{SerializerInstance, Serializer}
 import org.apache.spark.util.{CompletionIterator, Utils}
 
 /**
@@ -106,6 +106,8 @@ final class ShuffleBlockFetcherIterator(
 
   private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
 
+  private[this] val serializerInstance: SerializerInstance = serializer.newInstance()
+
   /**
    * Whether the iterator is still active. If isZombie is true, the callback interface will no
    * longer place fetched blocks into [[results]].
@@ -299,7 +301,7 @@ final class ShuffleBlockFetcherIterator(
         // the scheduler gets a FetchFailedException.
         Try(buf.createInputStream()).map { is0 =>
           val is = blockManager.wrapForCompression(blockId, is0)
-          val iter = serializer.newInstance().deserializeStream(is).asIterator
+          val iter = serializerInstance.deserializeStream(is).asIterator
           CompletionIterator[Any, Iterator[Any]](iter, {
             // Once the iterator is exhausted, release the buffer and set currentResult to null
             // so we don't release it again in cleanup.

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 9ff4744..30dd7f2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -151,8 +151,7 @@ class ExternalAppendOnlyMap[K, V, C](
   override protected[this] def spill(collection: SizeTracker): Unit = {
     val (blockId, file) = diskBlockManager.createTempLocalBlock()
     curWriteMetrics = new ShuffleWriteMetrics()
-    var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
-      curWriteMetrics)
+    var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
     var objectsWritten = 0
 
     // List of batch sizes (bytes) in the order they are written to disk
@@ -179,8 +178,7 @@ class ExternalAppendOnlyMap[K, V, C](
         if (objectsWritten == serializerBatchSize) {
           flush()
           curWriteMetrics = new ShuffleWriteMetrics()
-          writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
-            curWriteMetrics)
+          writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
         }
       }
       if (objectsWritten > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 035f376..79a1a8a 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -272,7 +272,8 @@ private[spark] class ExternalSorter[K, V, C](
     // createTempShuffleBlock here; see SPARK-3426 for more context.
     val (blockId, file) = diskBlockManager.createTempShuffleBlock()
     curWriteMetrics = new ShuffleWriteMetrics()
-    var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+    var writer = blockManager.getDiskWriter(
+      blockId, file, serInstance, fileBufferSize, curWriteMetrics)
     var objectsWritten = 0   // Objects written since the last flush
 
     // List of batch sizes (bytes) in the order they are written to disk
@@ -308,7 +309,8 @@ private[spark] class ExternalSorter[K, V, C](
         if (objectsWritten == serializerBatchSize) {
           flush()
           curWriteMetrics = new ShuffleWriteMetrics()
-          writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
+          writer = blockManager.getDiskWriter(
+            blockId, file, serInstance, fileBufferSize, curWriteMetrics)
         }
       }
       if (objectsWritten > 0) {
@@ -358,7 +360,9 @@ private[spark] class ExternalSorter[K, V, C](
         // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
         // createTempShuffleBlock here; see SPARK-3426 for more context.
         val (blockId, file) = diskBlockManager.createTempShuffleBlock()
-        blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
+        val writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize,
+          curWriteMetrics)
+        writer.open()
       }
       // Creating the file to write to and creating a disk writer both involve interacting with
       // the disk, and can take a long time in aggregate when we open many files, so should be
@@ -749,8 +753,8 @@ private[spark] class ExternalSorter[K, V, C](
       // partition and just write everything directly.
       for ((id, elements) <- this.partitionedIterator) {
         if (elements.hasNext) {
-          val writer = blockManager.getDiskWriter(
-            blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
+          val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
+            context.taskMetrics.shuffleWriteMetrics.get)
           for (elem <- elements) {
             writer.write(elem)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/f83c0f11/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
index 78bbc4e..003a728 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala
@@ -30,7 +30,7 @@ class BlockObjectWriterSuite extends FunSuite {
     val file = new File(Utils.createTempDir(), "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
     val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20))
     // Record metrics update on every write
@@ -52,7 +52,7 @@ class BlockObjectWriterSuite extends FunSuite {
     val file = new File(Utils.createTempDir(), "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
     val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.write(Long.box(20))
     // Record metrics update on every write
@@ -75,7 +75,7 @@ class BlockObjectWriterSuite extends FunSuite {
     val file = new File(Utils.createTempDir(), "somefile")
     val writeMetrics = new ShuffleWriteMetrics()
     val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file,
-      new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics)
+      new JavaSerializer(new SparkConf()).newInstance(), 1024, os => os, true, writeMetrics)
 
     writer.open()
     writer.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org