You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bogdan Raducanu (JIRA)" <ji...@apache.org> on 2017/06/07 15:33:18 UTC

[jira] [Commented] (SPARK-20342) DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators

    [ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041054#comment-16041054 ] 

Bogdan Raducanu commented on SPARK-20342:
-----------------------------------------

This code fails because of this issue:

{code}
test("test") {
    val foundMetrics = mutable.Set.empty[String]
    spark.sparkContext.addSparkListener(new SparkListener {
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        taskEnd.taskInfo.accumulables.foreach { a =>
          if (a.name.isDefined) {
            foundMetrics.add(a.name.get)
          }
        }
      }
    })
    for (iter <- 0 until 100) {
      foundMetrics.clear()
      println(s"iter = $iter")
      spark.range(10).groupBy().agg("id" -> "sum").collect
      spark.sparkContext.listenerBus.waitUntilEmpty(3000)
      assert(foundMetrics.size > 0)
    }
  }
{code}

> DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-20342
>                 URL: https://issues.apache.org/jira/browse/SPARK-20342
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Marcelo Vanzin
>
> Hit this on 2.2, but probably has been there forever. This is similar in spirit to SPARK-20205.
> Event is sent here, around L1154:
> {code}
>     listenerBus.post(SparkListenerTaskEnd(
>        stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
> {code}
> Accumulators are updated later, around L1173:
> {code}
>     val stage = stageIdToStage(task.stageId)
>     event.reason match {
>       case Success =>
>         task match {
>           case rt: ResultTask[_, _] =>
>             // Cast to ResultStage here because it's part of the ResultTask
>             // TODO Refactor this out to a function that accepts a ResultStage
>             val resultStage = stage.asInstanceOf[ResultStage]
>             resultStage.activeJob match {
>               case Some(job) =>
>                 if (!job.finished(rt.outputId)) {
>                   updateAccumulators(event)
> {code}
> Same thing applies here; UI shows correct info because it's pointing at the mutable {{TaskInfo}} structure. But the event log, for example, may record the wrong information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org