You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/02/21 03:01:54 UTC

spark git commit: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1df8020e1 -> 24fe6eb0f


[SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?
This PR backports [#20244](https://github.com/apache/spark/pull/20244)

When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop.

## How was this patch tested?
the exist tests.

Author: huangtengfei <hu...@huangtengfeideMacBook-Pro.local>

Closes #20635 from ivoson/branch-2.1-23053.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24fe6eb0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24fe6eb0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24fe6eb0

Branch: refs/heads/branch-2.1
Commit: 24fe6eb0f6e2fe31d1c93fc65bc294ebb94e2dcf
Parents: 1df8020
Author: huangtengfei <hu...@huangtengfeideMacBook-Pro.local>
Authored: Tue Feb 20 21:01:45 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue Feb 20 21:01:45 2018 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24fe6eb0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 01a95c0..9d46d69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -982,15 +982,24 @@ class DAGScheduler(
     // might modify state of objects referenced in their closures. This is necessary in Hadoop
     // where the JobConf/Configuration object is not thread-safe.
     var taskBinary: Broadcast[Array[Byte]] = null
+    var partitions: Array[Partition] = null
     try {
       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
       // For ResultTask, serialize and broadcast (rdd, func).
-      val taskBinaryBytes: Array[Byte] = stage match {
-        case stage: ShuffleMapStage =>
-          JavaUtils.bufferToArray(
-            closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-        case stage: ResultStage =>
-          JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
+      var taskBinaryBytes: Array[Byte] = null
+      // taskBinaryBytes and partitions are both effected by the checkpoint status. We need
+      // this synchronization in case another concurrent job is checkpointing this RDD, so we get a
+      // consistent view of both variables.
+      RDDCheckpointData.synchronized {
+        taskBinaryBytes = stage match {
+          case stage: ShuffleMapStage =>
+            JavaUtils.bufferToArray(
+              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
+          case stage: ResultStage =>
+            JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
+        }
+
+        partitions = stage.rdd.partitions
       }
 
       taskBinary = sc.broadcast(taskBinaryBytes)
@@ -1013,7 +1022,7 @@ class DAGScheduler(
         case stage: ShuffleMapStage =>
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
-            val part = stage.rdd.partitions(id)
+            val part = partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
@@ -1022,7 +1031,7 @@ class DAGScheduler(
         case stage: ResultStage =>
           partitionsToCompute.map { id =>
             val p: Int = stage.partitions(id)
-            val part = stage.rdd.partitions(p)
+            val part = partitions(p)
             val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org