You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/08 16:25:02 UTC

spark git commit: [SPARK-20342][CORE] Update task accumulators before sending task end event.

Repository: spark
Updated Branches:
  refs/heads/master 9fccc3627 -> 9131bdb7e


[SPARK-20342][CORE] Update task accumulators before sending task end event.

This makes sures that listeners get updated task information; otherwise it's
possible to write incomplete task information into event logs, for example,
making the information in a replayed UI inconsistent with the original
application.

Added a new unit test to try to detect the problem, but it's not guaranteed
to fail since it's a race; but it fails pretty reliably for me without the
scheduler changes.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18393 from vanzin/SPARK-20342.try2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9131bdb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9131bdb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9131bdb7

Branch: refs/heads/master
Commit: 9131bdb7e12bcfb2cb699b3438f554604e28aaa8
Parents: 9fccc36
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Sun Jul 9 00:24:54 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Jul 9 00:24:54 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 70 ++++++++++++--------
 .../spark/scheduler/DAGSchedulerSuite.scala     | 32 ++++++++-
 2 files changed, 75 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9131bdb7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3422a5f..89b4cab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1122,6 +1122,25 @@ class DAGScheduler(
     }
   }
 
+  private def postTaskEnd(event: CompletionEvent): Unit = {
+    val taskMetrics: TaskMetrics =
+      if (event.accumUpdates.nonEmpty) {
+        try {
+          TaskMetrics.fromAccumulators(event.accumUpdates)
+        } catch {
+          case NonFatal(e) =>
+            val taskId = event.taskInfo.taskId
+            logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
+            null
+        }
+      } else {
+        null
+      }
+
+    listenerBus.post(SparkListenerTaskEnd(event.task.stageId, event.task.stageAttemptId,
+      Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics))
+  }
+
   /**
    * Responds to a task finishing. This is called inside the event loop so it assumes that it can
    * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@@ -1138,34 +1157,36 @@ class DAGScheduler(
       event.taskInfo.attemptNumber, // this is a task attempt number
       event.reason)
 
-    // Reconstruct task metrics. Note: this may be null if the task has failed.
-    val taskMetrics: TaskMetrics =
-      if (event.accumUpdates.nonEmpty) {
-        try {
-          TaskMetrics.fromAccumulators(event.accumUpdates)
-        } catch {
-          case NonFatal(e) =>
-            logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
-            null
-        }
-      } else {
-        null
-      }
-
-    // The stage may have already finished when we get this event -- eg. maybe it was a
-    // speculative task. It is important that we send the TaskEnd event in any case, so listeners
-    // are properly notified and can chose to handle it. For instance, some listeners are
-    // doing their own accounting and if they don't get the task end event they think
-    // tasks are still running when they really aren't.
-    listenerBus.post(SparkListenerTaskEnd(
-       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
-
     if (!stageIdToStage.contains(task.stageId)) {
+      // The stage may have already finished when we get this event -- eg. maybe it was a
+      // speculative task. It is important that we send the TaskEnd event in any case, so listeners
+      // are properly notified and can chose to handle it. For instance, some listeners are
+      // doing their own accounting and if they don't get the task end event they think
+      // tasks are still running when they really aren't.
+      postTaskEnd(event)
+
       // Skip all the actions if the stage has been cancelled.
       return
     }
 
     val stage = stageIdToStage(task.stageId)
+
+    // Make sure the task's accumulators are updated before any other processing happens, so that
+    // we can post a task end event before any jobs or stages are updated. The accumulators are
+    // only updated in certain cases.
+    event.reason match {
+      case Success =>
+        stage match {
+          case rs: ResultStage if rs.activeJob.isEmpty =>
+            // Ignore update if task's job has finished.
+          case _ =>
+            updateAccumulators(event)
+        }
+      case _: ExceptionFailure => updateAccumulators(event)
+      case _ =>
+    }
+    postTaskEnd(event)
+
     event.reason match {
       case Success =>
         task match {
@@ -1176,7 +1197,6 @@ class DAGScheduler(
             resultStage.activeJob match {
               case Some(job) =>
                 if (!job.finished(rt.outputId)) {
-                  updateAccumulators(event)
                   job.finished(rt.outputId) = true
                   job.numFinished += 1
                   // If the whole job has finished, remove it
@@ -1203,7 +1223,6 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-            updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
@@ -1374,8 +1393,7 @@ class DAGScheduler(
         // Do nothing here, left up to the TaskScheduler to decide how to handle denied commits
 
       case exceptionFailure: ExceptionFailure =>
-        // Tasks failed with exceptions might still have accumulator updates.
-        updateAccumulators(event)
+        // Nothing left to do, already handled above for accumulator updates.
 
       case TaskResultLost =>
         // Do nothing here; the TaskScheduler handles these failures and resubmits the task.

http://git-wip-us.apache.org/repos/asf/spark/blob/9131bdb7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 453be26..3b5df65 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
@@ -2346,6 +2346,36 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
       (Success, 1)))
   }
 
+  test("task end event should have updated accumulators (SPARK-20342)") {
+    val tasks = 10
+
+    val accumId = new AtomicLong()
+    val foundCount = new AtomicLong()
+    val listener = new SparkListener() {
+      override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+        event.taskInfo.accumulables.find(_.id == accumId.get).foreach { _ =>
+          foundCount.incrementAndGet()
+        }
+      }
+    }
+    sc.addSparkListener(listener)
+
+    // Try a few times in a loop to make sure. This is not guaranteed to fail when the bug exists,
+    // but it should at least make the test flaky. If the bug is fixed, this should always pass.
+    (1 to 10).foreach { i =>
+      foundCount.set(0L)
+
+      val accum = sc.longAccumulator(s"accum$i")
+      accumId.set(accum.id)
+
+      sc.parallelize(1 to tasks, tasks).foreach { _ =>
+        accum.add(1L)
+      }
+      sc.listenerBus.waitUntilEmpty(1000)
+      assert(foundCount.get() === tasks)
+    }
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org