You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/03/12 00:39:25 UTC

spark git commit: [SPARK-13830] prefer block manager than direct result for large result

Repository: spark
Updated Branches:
  refs/heads/master 66d9d0edf -> 2ef4c5963


[SPARK-13830] prefer block manager than direct result for large result

## What changes were proposed in this pull request?

The current RPC can't handle large blocks very well, it's very slow to fetch 100M block (about 1 minute). Once switch to block manager to fetch that, it took about 10 seconds (still could be improved).

## How was this patch tested?

existing unit tests.

Author: Davies Liu <da...@databricks.com>

Closes #11659 from davies/direct_result.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ef4c596
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ef4c596
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ef4c596

Branch: refs/heads/master
Commit: 2ef4c5963bff3574fe17e669d703b25ddd064e5d
Parents: 66d9d0e
Author: Davies Liu <da...@databricks.com>
Authored: Fri Mar 11 15:39:21 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Mar 11 15:39:21 2016 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/executor/Executor.scala | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2ef4c596/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e88d6cd..07e3c12 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -97,9 +97,11 @@ private[spark] class Executor(
   // Set the classloader for serializer
   env.serializer.setDefaultClassLoader(replClassLoader)
 
-  // Max RPC message size. If task result is bigger than this, we use the block manager
+  // Max size of direct result. If task result is bigger than this, we use the block manager
   // to send the result back.
-  private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+  private val maxDirectResultSize = Math.min(
+    conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
+    RpcUtils.maxMessageSizeBytes(conf))
 
   // Limit of bytes for total size of results (default is 1GB)
   private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -279,6 +281,7 @@ private[spark] class Executor(
 
         // Note: accumulator updates must be collected after TaskMetrics is updated
         val accumUpdates = task.collectAccumulatorUpdates()
+        // TODO: do not serialize value twice
         val directResult = new DirectTaskResult(valueBytes, accumUpdates)
         val serializedDirectResult = ser.serialize(directResult)
         val resultSize = serializedDirectResult.limit
@@ -290,7 +293,7 @@ private[spark] class Executor(
               s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
               s"dropping it.")
             ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
-          } else if (resultSize >= maxRpcMessageSize) {
+          } else if (resultSize > maxDirectResultSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
               blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org