You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ankuriitg <gi...@git.apache.org> on 2018/09/04 20:31:05 UTC

[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...

Github user ankuriitg commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22209#discussion_r215056633
  
    --- Diff: core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---
    @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
         assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
       }
     
    +  test("SPARK-24415: update metrics for tasks that finish late") {
    +    val listener = new AppStatusListener(store, conf, true)
    +
    +    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
    +    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
    +
    +    // Start job
    +    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))
    +
    +    // Start 2 stages
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
    +
    +    // Start 2 Tasks
    +    val tasks = createTasks(2, Array("1"))
    +    tasks.foreach { task =>
    +      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
    +    }
    +
    +    // Task 1 Finished
    +    time += 1
    +    tasks(0).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
    +
    +    // Stage 1 Completed
    +    stage1.failureReason = Some("Failed")
    +    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
    +
    +    // Stop job 1
    +    time += 1
    +    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
    +
    +    // Task 2 Killed
    +    time += 1
    +    tasks(1).markFinished(TaskState.FINISHED, time)
    +    listener.onTaskEnd(
    +      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
    +        TaskKilled(reason = "Killed"), tasks(1), null))
    +
    +    // Ensure killed task metrics are updated
    +    val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
    +    val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
    +    assert(failedStages.size == 1)
    +    assert(failedStages.head.numKilledTasks == 1)
    +    assert(failedStages.head.numCompleteTasks == 1)
    +
    +    val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
    +    assert(allJobs.size == 1)
    --- End diff --
    
    Sorry, I couldn't respond on Friday.
    
    So, the above code has been replaced with "conditionalLiveUpdate(job, now, removeStage)" in my PR. This means that if the taskEnd event is the last event and stage has already completed, the job will be updated instantaneously. If it is the last event but stage has not completed yet, then the onStageCompleted event will update the job instantaneously.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org