You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jiangxb1987 <gi...@git.apache.org> on 2017/07/05 15:31:27 UTC

[GitHub] spark pull request #18393: [SPARK-20342][core] Update task accumulators befo...

Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18393#discussion_r125676544
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1133,33 +1152,27 @@ class DAGScheduler(
           event.taskInfo.attemptNumber, // this is a task attempt number
           event.reason)
     
    -    // Reconstruct task metrics. Note: this may be null if the task has failed.
    -    val taskMetrics: TaskMetrics =
    -      if (event.accumUpdates.nonEmpty) {
    -        try {
    -          TaskMetrics.fromAccumulators(event.accumUpdates)
    -        } catch {
    -          case NonFatal(e) =>
    -            logError(s"Error when attempting to reconstruct metrics for task $taskId", e)
    -            null
    -        }
    -      } else {
    -        null
    -      }
    -
    -    // The stage may have already finished when we get this event -- eg. maybe it was a
    -    // speculative task. It is important that we send the TaskEnd event in any case, so listeners
    -    // are properly notified and can chose to handle it. For instance, some listeners are
    -    // doing their own accounting and if they don't get the task end event they think
    -    // tasks are still running when they really aren't.
    -    listenerBus.post(SparkListenerTaskEnd(
    -       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
    -
         if (!stageIdToStage.contains(task.stageId)) {
    +      // The stage may have already finished when we get this event -- eg. maybe it was a
    +      // speculative task. It is important that we send the TaskEnd event in any case, so listeners
    +      // are properly notified and can chose to handle it. For instance, some listeners are
    +      // doing their own accounting and if they don't get the task end event they think
    +      // tasks are still running when they really aren't.
    +      postTaskEnd(event)
    +
           // Skip all the actions if the stage has been cancelled.
           return
         }
     
    +    // Make sure the task's accumulators are updated before any other processing happens, so that
    +    // we can post a task end event before any jobs or stages are updated. The accumulators are
    +    // only updated in certain cases.
    +    event.reason match {
    +      case Success | _: ExceptionFailure => updateAccumulators(event)
    --- End diff --
    
    Does this means the `resultStage.activeJob` should not be empty by this time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org