You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/08 16:25:02 UTC
spark git commit: [SPARK-20342][CORE] Update task accumulators before
sending task end event.
Repository: spark
Updated Branches:
refs/heads/master 9fccc3627 -> 9131bdb7e
[SPARK-20342][CORE] Update task accumulators before sending task end event.
This makes sures that listeners get updated task information; otherwise it's
possible to write incomplete task information into event logs, for example,
making the information in a replayed UI inconsistent with the original
application.
Added a new unit test to try to detect the problem, but it's not guaranteed
to fail since it's a race; but it fails pretty reliably for me without the
scheduler changes.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #18393 from vanzin/SPARK-20342.try2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9131bdb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9131bdb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9131bdb7
Branch: refs/heads/master
Commit: 9131bdb7e12bcfb2cb699b3438f554604e28aaa8
Parents: 9fccc36
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Sun Jul 9 00:24:54 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Jul 9 00:24:54 2017 +0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 70 ++++++++++++--------
.../spark/scheduler/DAGSchedulerSuite.scala | 32 ++++++++-
2 files changed, 75 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9131bdb7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3422a5f..89b4cab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1122,6 +1122,25 @@ class DAGScheduler(
}
}
+ private def postTaskEnd(event: CompletionEvent): Unit = {
+ val taskMetrics: TaskMetrics =
+ if (event.accumUpdates.nonEmpty) {
+ try {
+ TaskMetrics.fromAccumulators(event.accumUpdates)
+ } catch {
+ case NonFatal(e) =>
+ val taskId = event.taskInfo.taskId
+ logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
+ null
+ }
+ } else {
+ null
+ }
+
+ listenerBus.post(SparkListenerTaskEnd(event.task.stageId, event.task.stageAttemptId,
+ Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics))
+ }
+
/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -1138,34 +1157,36 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)
- // Reconstruct task metrics. Note: this may be null if the task has failed.
- val taskMetrics: TaskMetrics =
- if (event.accumUpdates.nonEmpty) {
- try {
- TaskMetrics.fromAccumulators(event.accumUpdates)
- } catch {
- case NonFatal(e) =>
- logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
- null
- }
- } else {
- null
- }
-
- // The stage may have already finished when we get this event -- eg. maybe it was a
- // speculative task. It is important that we send the TaskEnd event in any case, so listeners
- // are properly notified and can chose to handle it. For instance, some listeners are
- // doing their own accounting and if they don't get the task end event they think
- // tasks are still running when they really aren't.
- listenerBus.post(SparkListenerTaskEnd(
- stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
-
if (!stageIdToStage.contains(task.stageId)) {
+ // The stage may have already finished when we get this event -- eg. maybe it was a
+ // speculative task. It is important that we send the TaskEnd event in any case, so listeners
+ // are properly notified and can chose to handle it. For instance, some listeners are
+ // doing their own accounting and if they don't get the task end event they think
+ // tasks are still running when they really aren't.
+ postTaskEnd(event)
+
// Skip all the actions if the stage has been cancelled.
return
}
val stage = stageIdToStage(task.stageId)
+
+ // Make sure the task's accumulators are updated before any other processing happens, so that
+ // we can post a task end event before any jobs or stages are updated. The accumulators are
+ // only updated in certain cases.
+ event.reason match {
+ case Success =>
+ stage match {
+ case rs: ResultStage if rs.activeJob.isEmpty =>
+ // Ignore update if task's job has finished.
+ case _ =>
+ updateAccumulators(event)
+ }
+ case _: ExceptionFailure => updateAccumulators(event)
+ case _ =>
+ }
+ postTaskEnd(event)
+
event.reason match {
case Success =>
task match {
@@ -1176,7 +1197,6 @@ class DAGScheduler(
resultStage.activeJob match {
case Some(job) =>
if (!job.finished(rt.outputId)) {
- updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
@@ -1203,7 +1223,6 @@ class DAGScheduler(
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
- updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
@@ -1374,8 +1393,7 @@ class DAGScheduler(
// Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
case exceptionFailure: ExceptionFailure =>
- // Tasks failed with exceptions might still have accumulator updates.
- updateAccumulators(event)
+ // Nothing left to do, already handled above for accumulator updates.
case TaskResultLost =>
// Do nothing here; the TaskScheduler handles these failures and resubmits the task.
http://git-wip-us.apache.org/repos/asf/spark/blob/9131bdb7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 453be26..3b5df65 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.annotation.meta.param
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -2346,6 +2346,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
(Success, 1)))
}
+ test("task end event should have updated accumulators (SPARK-20342)") {
+ val tasks = 10
+
+ val accumId = new AtomicLong()
+ val foundCount = new AtomicLong()
+ val listener = new SparkListener() {
+ override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+ event.taskInfo.accumulables.find(_.id == accumId.get).foreach { _ =>
+ foundCount.incrementAndGet()
+ }
+ }
+ }
+ sc.addSparkListener(listener)
+
+ // Try a few times in a loop to make sure. This is not guaranteed to fail when the bug exists,
+ // but it should at least make the test flaky. If the bug is fixed, this should always pass.
+ (1 to 10).foreach { i =>
+ foundCount.set(0L)
+
+ val accum = sc.longAccumulator(s"accum$i")
+ accumId.set(accum.id)
+
+ sc.parallelize(1 to tasks, tasks).foreach { _ =>
+ accum.add(1L)
+ }
+ sc.listenerBus.waitUntilEmpty(1000)
+ assert(foundCount.get() === tasks)
+ }
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org