You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bogdan Raducanu (JIRA)" <ji...@apache.org> on 2017/06/07 15:33:18 UTC
[jira] [Commented] (SPARK-20342) DAGScheduler sends
SparkListenerTaskEnd before updating task's accumulators
[ https://issues.apache.org/jira/browse/SPARK-20342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16041054#comment-16041054 ]
Bogdan Raducanu commented on SPARK-20342:
-----------------------------------------
This code fails because of this issue:
{code}
test("test") {
val foundMetrics = mutable.Set.empty[String]
spark.sparkContext.addSparkListener(new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEnd.taskInfo.accumulables.foreach { a =>
if (a.name.isDefined) {
foundMetrics.add(a.name.get)
}
}
}
})
for (iter <- 0 until 100) {
foundMetrics.clear()
println(s"iter = $iter")
spark.range(10).groupBy().agg("id" -> "sum").collect
spark.sparkContext.listenerBus.waitUntilEmpty(3000)
assert(foundMetrics.size > 0)
}
}
{code}
> DAGScheduler sends SparkListenerTaskEnd before updating task's accumulators
> ---------------------------------------------------------------------------
>
> Key: SPARK-20342
> URL: https://issues.apache.org/jira/browse/SPARK-20342
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.2.0
> Reporter: Marcelo Vanzin
>
> Hit this on 2.2, but probably has been there forever. This is similar in spirit to SPARK-20205.
> Event is sent here, around L1154:
> {code}
> listenerBus.post(SparkListenerTaskEnd(
> stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
> {code}
> Accumulators are updated later, around L1173:
> {code}
> val stage = stageIdToStage(task.stageId)
> event.reason match {
> case Success =>
> task match {
> case rt: ResultTask[_, _] =>
> // Cast to ResultStage here because it's part of the ResultTask
> // TODO Refactor this out to a function that accepts a ResultStage
> val resultStage = stage.asInstanceOf[ResultStage]
> resultStage.activeJob match {
> case Some(job) =>
> if (!job.finished(rt.outputId)) {
> updateAccumulators(event)
> {code}
> Same thing applies here; UI shows correct info because it's pointing at the mutable {{TaskInfo}} structure. But the event log, for example, may record the wrong information.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org