You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/04/17 13:55:11 UTC
spark git commit: [SPARK-23948] Trigger mapstage's job listener in
submitMissingTasks
Repository: spark
Updated Branches:
refs/heads/master ed4101d29 -> 3990daaf3
[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
## What changes were proposed in this pull request?
SparkContext submitted a map stage from `submitMapStage` to `DAGScheduler`,
`markMapStageJobAsFinished` is called only in (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L933 and https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1314);
But think about below scenario:
1. stage0 and stage1 are all `ShuffleMapStage` and stage1 depends on stage0;
2. We submit stage1 by `submitMapStage`;
3. When stage 1 running, `FetchFailed` happened, stage0 and stage1 got resubmitted as stage0_1 and stage1_1;
4. When stage0_1 running, speculated tasks in old stage1 come as succeeded, but stage1 is not inside `runningStages`. So even though all splits(including the speculated tasks) in stage1 succeeded, job listener in stage1 will not be called;
5. stage0_1 finished, stage1_1 starts running. When `submitMissingTasks`, there is no missing tasks. But in current code, job listener is not triggered.
We should call the job listener for map stage in `5`.
## How was this patch tested?
Not added yet.
Author: jinxing <ji...@126.com>
Closes #21019 from jinxing64/SPARK-23948.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3990daaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3990daaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3990daaf
Branch: refs/heads/master
Commit: 3990daaf3b6ca2c5a9f7790030096262efb12cb2
Parents: ed4101d
Author: jinxing <ji...@126.com>
Authored: Tue Apr 17 08:55:01 2018 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue Apr 17 08:55:01 2018 -0500
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 33 +++++++------
.../spark/scheduler/DAGSchedulerSuite.scala | 52 ++++++++++++++++++++
2 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3990daaf/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8c46a84..78b6b34 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1092,17 +1092,16 @@ class DAGScheduler(
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
- val debugString = stage match {
+ stage match {
case stage: ShuffleMapStage =>
- s"Stage ${stage} is actually done; " +
- s"(available: ${stage.isAvailable}," +
- s"available outputs: ${stage.numAvailableOutputs}," +
- s"partitions: ${stage.numPartitions})"
+ logDebug(s"Stage ${stage} is actually done; " +
+ s"(available: ${stage.isAvailable}," +
+ s"available outputs: ${stage.numAvailableOutputs}," +
+ s"partitions: ${stage.numPartitions})")
+ markMapStageJobsAsFinished(stage)
case stage : ResultStage =>
- s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
+ logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
- logDebug(debugString)
-
submitWaitingChildStages(stage)
}
}
@@ -1307,13 +1306,7 @@ class DAGScheduler(
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
- // Mark any map-stage jobs waiting on this stage as finished
- if (shuffleStage.mapStageJobs.nonEmpty) {
- val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
- for (job <- shuffleStage.mapStageJobs) {
- markMapStageJobAsFinished(job, stats)
- }
- }
+ markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
@@ -1433,6 +1426,16 @@ class DAGScheduler(
}
}
+ private[scheduler] def markMapStageJobsAsFinished(shuffleStage: ShuffleMapStage): Unit = {
+ // Mark any map-stage jobs waiting on this stage as finished
+ if (shuffleStage.isAvailable && shuffleStage.mapStageJobs.nonEmpty) {
+ val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+ for (job <- shuffleStage.mapStageJobs) {
+ markMapStageJobAsFinished(job, stats)
+ }
+ }
+ }
+
/**
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
http://git-wip-us.apache.org/repos/asf/spark/blob/3990daaf/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d812b5b..8b6ec37 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2146,6 +2146,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assertDataStructuresEmpty()
}
+ test("Trigger mapstage's job listener in submitMissingTasks") {
+ val rdd1 = new MyRDD(sc, 2, Nil)
+ val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+ val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker)
+ val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+
+ val listener1 = new SimpleListener
+ val listener2 = new SimpleListener
+
+ submitMapStage(dep1, listener1)
+ submitMapStage(dep2, listener2)
+
+ // Complete the stage0.
+ assert(taskSets(0).stageId === 0)
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+ (Success, makeMapStatus("hostB", rdd1.partitions.length))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ assert(listener1.results.size === 1)
+
+ // When attempting stage1, trigger a fetch failure.
+ assert(taskSets(1).stageId === 1)
+ complete(taskSets(1), Seq(
+ (Success, makeMapStatus("hostC", rdd2.partitions.length)),
+ (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
+ scheduler.resubmitFailedStages()
+ // Stage1 listener should not have a result yet
+ assert(listener2.results.size === 0)
+
+ // Speculative task succeeded in stage1.
+ runEvent(makeCompletionEvent(
+ taskSets(1).tasks(1),
+ Success,
+ makeMapStatus("hostD", rdd2.partitions.length)))
+ // stage1 listener still should not have a result, though there's no missing partitions
+ // in it. Because stage1 has been failed and is not inside `runningStages` at this moment.
+ assert(listener2.results.size === 0)
+
+ // Stage0 should now be running as task set 2; make its task succeed
+ assert(taskSets(2).stageId === 0)
+ complete(taskSets(2), Seq(
+ (Success, makeMapStatus("hostC", rdd2.partitions.length))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+ Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+ // After stage0 is finished, stage1 will be submitted and found there is no missing
+ // partitions in it. Then listener got triggered.
+ assert(listener2.results.size === 1)
+ assertDataStructuresEmpty()
+ }
+
/**
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org