You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2022/09/01 00:39:10 UTC

[spark] branch master updated: [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating result size

This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4b075f95f [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating result size
5a4b075f95f is described below

commit 5a4b075f95f4cb305ba96d6de34d3c004e15f241
Author: Ziqi Liu <zi...@databricks.com>
AuthorDate: Wed Aug 31 17:38:35 2022 -0700

    [SPARK-40261][CORE] Exclude DirectTaskResult metadata when calculating result size
    
    ### What changes were proposed in this pull request?
    When calculating driver result size, only counting actual result value while excluding other metadata (e.g., accumUpdates) in the serialized result task object.
    
    ### Why are the changes needed?
    metadata should not be counted because they will be discarded by the driver immediately after being processed, and will lead to unexpected exception when running jobs with tons of task but actually return small results.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Unit test
    
    Closes #37713 from liuzqt/SPARK-40261.
    
    Lead-authored-by: Ziqi Liu <zi...@databricks.com>
    Co-authored-by: liuzqt <zi...@databricks.com>
    Signed-off-by: Josh Rosen <jo...@databricks.com>
---
 .../scala/org/apache/spark/scheduler/TaskResultGetter.scala |  2 +-
 .../org/apache/spark/scheduler/TaskResultGetterSuite.scala  | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 2dabee39131..cfc1f79fab2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -63,7 +63,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
         try {
           val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
             case directResult: DirectTaskResult[_] =>
-              if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
+              if (!taskSetManager.canFetchMoreResults(directResult.valueBytes.limit())) {
                 // kill the task so that it will not become zombie task
                 scheduler.handleFailedTask(taskSetManager, tid, TaskState.KILLED, TaskKilled(
                   "Tasks result size has exceeded maxResultSize"))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ea44a2d948c..1583d3b96ee 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.storage.TaskResultBlockId
 import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, ThreadUtils, Utils}
@@ -297,6 +298,18 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
     assert(unknownFailure.findFirstMatchIn(message).isDefined)
   }
 
+  test("SPARK-40261: task result metadata should not be counted into result size") {
+    val conf = new SparkConf().set(MAX_RESULT_SIZE.key, "1M")
+    sc = new SparkContext("local", "test", conf)
+    val rdd = sc.parallelize(1 to 10000, 10000)
+    // This will trigger 10k task but return empty result. The total serialized return tasks
+    // size(including accumUpdates metadata) would be ~10M in total in this example, but the result
+    // value itself is pretty small(empty arrays)
+    // Even setting MAX_RESULT_SIZE to a small value(1M here), it should not throw exception
+    // because the actual result is small
+    assert(rdd.filter(_ < 0).collect().isEmpty)
+  }
+
 }
 
 private class UndeserializableException extends Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org