You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by squito <gi...@git.apache.org> on 2018/04/23 20:33:22 UTC

[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

GitHub user squito opened a pull request:

    https://github.com/apache/spark/pull/21131

    [SPARK-23433][CORE] Late zombie task completions update all tasksets

    Fetch failure lead to multiple tasksets which are active for a given
    stage.  A late completion from an earlier attempt of the stage
    should update the most recent attempt for the stage, so it does 
    not try to submit another task for the same partition, and so that
     it knows when it is completed and when it should be marked as
    a "zombie".
    
    Added a regression test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/squito/spark SPARK-23433

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21131
    
----
commit 0720a7cd6826614e516c3d3a51bd4519259cbe3b
Author: Imran Rashid <ir...@...>
Date:   2018-02-21T20:21:14Z

    [SPARK-23433][CORE] Late zombie task completions update all tasksets
    
    After a fetch failure and stage retry, we may have multiple tasksets
    which are active for a given stage.  A late completion from an earlier
    attempt of the stage should update the most recent attempt for the
    stage, so it does not try to submit another task for the same partition,
    and so that it knows when it is completed.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183775077
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    --- End diff --
    
    This condition is not needed as the stageAttempt iterates on Range(0, 1).  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2632/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89792/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183690133
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    +      val stageAttempt = tsm.taskSet.stageAttemptId
    +      tsm.runningTasksSet.foreach { index =>
    +        if (stageAttempt == 1) {
    +          tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
    +        } else {
    +          val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +          tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
    +        }
    +      }
    +
    +      // we update the blacklist for the stage attempts with all successful tasks.  Even though
    +      // some tasksets had failures, we still consider them all successful from a blacklisting
    +      // perspective, as the failures weren't from a problem w/ the tasks themselves.
    +      verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
    --- End diff --
    
    What is `meq()` ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183789814
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    > because they won't be able to get their shuffle input, same as the original fetch failure
    
    why? In `DAGScheduler`, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task)  may still get MapOutputs from `MapOutputTrackerMaster`, and fetch data from other `Executor`s. So, they can success normally. 
    Do I miss something?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89739 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89739/testReport)** for PR 21131 at commit [`0720a7c`](https://github.com/apache/spark/commit/0720a7cd6826614e516c3d3a51bd4519259cbe3b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183783006
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    --- End diff --
    
    As I see remainingTasks is always the same as finalAttemptLaunchedPartitions. I am wondering whether it is more readable to use finalAttemptLaunchedPartitions here for initialisation. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183775539
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    --- End diff --
    
    not a nitpick at all, thanks for catching this!  I'll update


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89790/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89790 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89790/testReport)** for PR 21131 at commit [`707307f`](https://github.com/apache/spark/commit/707307fd8e468dce82d45a713ce11c9ce3f96d45).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by attilapiros <gi...@git.apache.org>.
Github user attilapiros commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183774355
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    --- End diff --
    
    Here you can reuse the val "allTaskSets".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    hmm, will we have a problem for shuffle here? Assuming a shuffle stage has 2 tasksets, one is zombie, one is normal. Both of them have running tasks.
    
    if a task in zombie taskset finishes, it sends a task completion event to dag scheduler, which will be ignored later as the stage attempt id is not the latest. However, when the corresponding task in the normal taskset finishes, it will not send event to dag scheduler because this task is already marked as finished in this taskset. Then the shuffle stage never finishes.
    
    cc @JoshRosen  @vanzin  @zsxwing 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2634/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183781695
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion.  I'm just trying to do a correctness fix here.  Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task.  And perhaps we should even actively kill those tasks (you'll see comments about that in various places).  But if they do succeed, we need to handle them correctly.  Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort".


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183776603
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    +      val stageAttempt = tsm.taskSet.stageAttemptId
    +      tsm.runningTasksSet.foreach { index =>
    +        if (stageAttempt == 1) {
    +          tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
    +        } else {
    +          val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +          tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
    +        }
    +      }
    +
    +      // we update the blacklist for the stage attempts with all successful tasks.  Even though
    +      // some tasksets had failures, we still consider them all successful from a blacklisting
    +      // perspective, as the failures weren't from a problem w/ the tasks themselves.
    +      verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
    --- End diff --
    
    this is mockito's `eq` matcher which is renamed to avoid clashing with scala's `eq`, this is a standard rename we use in the codebase:
    
    https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala#L24



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183701269
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    Yet, launched tasks has nothing to do with other running tasks in other `TaskSet`s. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example,  launching FetchFailed task or tasks who do not have a running copy across `TaskSet`s firstly ?
    
    (But, it seems we will always have running copies in other `TaskSet`s for our  final `TaskSet`, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous `TaskSet`s.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89739/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183793745
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    the assumption is that a fetchfailure means that *all* data on that host is unavailable.  As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host.  Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know.  Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again.  They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source). 
    
    Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest):
    
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1391
    
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1406 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183766277
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    --- End diff --
    
    Hmmm... I know it's pretty nitpick but since `remainingTasks` is a set, you can't guarantee the final completion comes from a zombie. It's fine to keep this, or we can finish the partition 0 first instead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    >>Fetch failure lead to multiple tasksets which are active for a given stage.
    >How can this happen? the TaskSetManager will mark itself as zombie when it receives a fetch failed.
    
    We don't have very precise terminology here -- I'm using "active" to mean a taskset which still has running tasks.  Even when a taskset is zombie, it will have many previously launched tasks still going.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89739/testReport)** for PR 21131 at commit [`0720a7c`](https://github.com/apache/spark/commit/0720a7cd6826614e516c3d3a51bd4519259cbe3b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89792 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89792/testReport)** for PR 21131 at commit [`168fd46`](https://github.com/apache/spark/commit/168fd46618d9e18eee03be3714a30776e6ae1463).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183779968
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    --- End diff --
    
    good point, fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    hmm, will we have a problem for shuffle here? Assuming a shuffle stage has 2 task sets, one is active, one is zombie. Both of them have running tasks.
    
    If a task from zombie task set finishes, it will send a task completion event to DAG scheduler. The event might be ignored later because the epoch is outdated. When the task in the normal task set finishes, it will not send event to DAG scheduler because this task is already marked as finished in this task set. Then the shuffle stage never finishes.
    
    cc @JoshRosen @zsxwing @vanzin  


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    @markhamstra @zsxwing @jiangxb1987 @Ngone51 would appreciate a review, thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r203435933
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
         maybeFinishTaskSet()
       }
     
    +  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
    +    partitionToIndex.get(partitionId).foreach { index =>
    +      if (!successful(index)) {
    +        tasksSuccessful += 1
    +        successful(index) = true
    +        if (tasksSuccessful == numTasks) {
    +          isZombie = true
    +        }
    +        maybeFinishTaskSet()
    --- End diff --
    
    it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r203415036
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
         maybeFinishTaskSet()
       }
     
    +  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
    +    partitionToIndex.get(partitionId).foreach { index =>
    +      if (!successful(index)) {
    +        tasksSuccessful += 1
    +        successful(index) = true
    +        if (tasksSuccessful == numTasks) {
    +          isZombie = true
    +        }
    +        maybeFinishTaskSet()
    --- End diff --
    
    I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the `runningTasks == 0` check inside `maybeFinishTaskSet()`.
    
    do you think its worth removing?  I'm fine either way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    > Fetch failure lead to multiple tasksets which are active for a given
    stage.
    
    How can this happen? the `TaskSetManager` will mark itself as zombie when it receives a fetch failed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    LGTM, and nice UT.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    ah i see. Does it only apply to the result stage? IIRC shuffle stage tracks shuffle epoch and will ignore the tasks from a killed stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r203257926
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -764,6 +769,19 @@ private[spark] class TaskSetManager(
         maybeFinishTaskSet()
       }
     
    +  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
    +    partitionToIndex.get(partitionId).foreach { index =>
    +      if (!successful(index)) {
    +        tasksSuccessful += 1
    +        successful(index) = true
    +        if (tasksSuccessful == numTasks) {
    +          isZombie = true
    +        }
    +        maybeFinishTaskSet()
    --- End diff --
    
    is this line needed? We will call `maybeFinishTaskSet()` at the end of `handleSuccessfulTask`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183797532
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    --- End diff --
    
    The explanation is quite clear and I get understand now. Thank you very mush! @squito 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    The DAGSCheudler is notified about successfully completed tasks, whether or not the `tsm.successful` is already true:
    
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L745-L768
    
    I don't think there are problems here ... though I agree its confusing and we could have better tests here ...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89790 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89790/testReport)** for PR 21131 at commit [`707307f`](https://github.com/apache/spark/commit/707307fd8e468dce82d45a713ce11c9ce3f96d45).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    a late LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183778637
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    --- End diff --
    
    thanks for pointing that out -- though actually I'm going to go the other direction, I realized `allTaskSets` is not necessary at all.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183619704
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     
    +  /**
    +   * Marks the task has completed in all TaskSetManagers for the given stage.
    +   *
    +   * After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
    +   * If an earlier attempt of a stage completes a task, we should ensure that the later attempts
    +   * do not also submit those same tasks.  That also means that a task completion from an  earlier
    +   * attempt can lead to the entire stage getting marked as successful.
    +   */
    +  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
    +    taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
    --- End diff --
    
    Generally, it seems impossible for a unfinished `TaskSet` to get an empty `Map()` in `taskSetsByStageIdAndAttempt` .  But, if it does, maybe, we can tell the caller the stage has already finished.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183777930
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     
    +  /**
    +   * Marks the task has completed in all TaskSetManagers for the given stage.
    +   *
    +   * After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
    --- End diff --
    
    yeah the terminology is a bit of mess here ... I dunno if we consistently distinguish the use of "active" for one taskset which is non-zombie vs. all the tasksets which have some tasks that are running (though all-but-one must be zombies).
    @markhamstra @kayousterhout thoughts on naming?
    
    In any case, I think you're right, I will remove "active" here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183790463
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
    @@ -917,4 +917,111 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           taskScheduler.initialize(new FakeSchedulerBackend)
         }
       }
    +
    +  test("Completions in zombie tasksets update status of non-zombie taskset") {
    +    val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
    +    val valueSer = SparkEnv.get.serializer.newInstance()
    +
    +    def completeTaskSuccessfully(tsm: TaskSetManager, partition: Int): Unit = {
    +      val indexInTsm = tsm.partitionToIndex(partition)
    +      val matchingTaskInfo = tsm.taskAttempts.flatten.filter(_.index == indexInTsm).head
    +      val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +      tsm.handleSuccessfulTask(matchingTaskInfo.taskId, result)
    +    }
    +
    +    // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
    +    // two times, so we have three active task sets for one stage.  (For this to really happen,
    +    // you'd need the previous stage to also get restarted, and then succeed, in between each
    +    // attempt, but that happens outside what we're mocking here.)
    +    val zombieAttempts = (0 until 2).map { stageAttempt =>
    +      val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
    +      taskScheduler.submitTasks(attempt)
    +      val tsm = taskScheduler.taskSetManagerForAttempt(0, stageAttempt).get
    +      val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +      taskScheduler.resourceOffers(offers)
    +      assert(tsm.runningTasks === 10)
    +      if (stageAttempt < 2) {
    +        // fail attempt
    +        tsm.handleFailedTask(tsm.taskAttempts.head.head.taskId, TaskState.FAILED,
    +          FetchFailed(null, 0, 0, 0, "fetch failed"))
    +        // the attempt is a zombie, but the tasks are still running (this could be true even if
    +        // we actively killed those tasks, as killing is best-effort)
    +        assert(tsm.isZombie)
    +        assert(tsm.runningTasks === 9)
    +      }
    +      tsm
    +    }
    +
    +    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
    +    // the stage, but this time with insufficient resources so not all tasks are active.
    +
    +    val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
    +    taskScheduler.submitTasks(finalAttempt)
    +    val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
    +    val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
    +    val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
    +      finalAttempt.tasks(task.index).partitionId
    +    }.toSet
    +    assert(finalTsm.runningTasks === 5)
    +    assert(!finalTsm.isZombie)
    +
    +    // We simulate late completions from our zombie tasksets, corresponding to all the pending
    +    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
    +    // launched.
    +    val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
    +    finalAttemptPendingPartitions.foreach { partition =>
    +      completeTaskSuccessfully(zombieAttempts(0), partition)
    +    }
    +
    +    // If there is another resource offer, we shouldn't run anything.  Though our final attempt
    +    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
    +    // remaining tasks to compute are already active in the non-zombie attempt.
    +    assert(
    +      taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
    +
    +    val allTaskSets = zombieAttempts ++ Seq(finalTsm)
    +    val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
    +
    +    // finally, if we finish the remaining partitions from a mix of tasksets, all attempts should be
    +    // marked as zombie.
    +    // for each of the remaining tasks, find the tasksets with an active copy of the task, and
    +    // finish the task.
    +    remainingTasks.foreach { partition =>
    +      val tsm = if (partition == 0) {
    +        // we failed this task on both zombie attempts, this one is only present in the latest
    +        // taskset
    +        finalTsm
    +      } else {
    +        // should be active in every taskset.  We choose a zombie taskset just to make sure that
    +        // we transition the active taskset correctly even if the final completion comes
    +        // from a zombie.
    +        zombieAttempts(partition % 2)
    +      }
    +      completeTaskSuccessfully(tsm, partition)
    +    }
    +
    +    assert(finalTsm.isZombie)
    +
    +    // no taskset has completed all of its tasks, so no updates to the blacklist tracker yet
    +    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(anyInt(), anyInt(), anyObject())
    +
    +    // finally, lets complete all the tasks.  We simulate failures in attempt 1, but everything
    +    // else succeeds, to make sure we get the right updates to the blacklist in all cases.
    +    (zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
    +      val stageAttempt = tsm.taskSet.stageAttemptId
    +      tsm.runningTasksSet.foreach { index =>
    +        if (stageAttempt == 1) {
    +          tsm.handleFailedTask(tsm.taskInfos(index).taskId, TaskState.FAILED, TaskResultLost)
    +        } else {
    +          val result = new DirectTaskResult[Int](valueSer.serialize(1), Seq())
    +          tsm.handleSuccessfulTask(tsm.taskInfos(index).taskId, result)
    +        }
    +      }
    +
    +      // we update the blacklist for the stage attempts with all successful tasks.  Even though
    +      // some tasksets had failures, we still consider them all successful from a blacklisting
    +      // perspective, as the failures weren't from a problem w/ the tasks themselves.
    +      verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
    --- End diff --
    
    Oh, the code is folded, no wonder I didn't find it. Thank you.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    merged to master / 2.3 / 2.2


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by squito <gi...@git.apache.org>.
Github user squito closed the pull request at:

    https://github.com/apache/spark/pull/21131


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2599/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21131: [SPARK-23433][CORE] Late zombie task completions ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21131#discussion_r183619646
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
    @@ -689,6 +689,20 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     
    +  /**
    +   * Marks the task has completed in all TaskSetManagers for the given stage.
    +   *
    +   * After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
    --- End diff --
    
    IIRC, there's only one active `TaskSetManager` for a given stage, and with some zombie `TaskSetManager`s possibly. Though, there may be some running tasks in zombie `TaskSetManager`s.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    **[Test build #89792 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89792/testReport)** for PR 21131 at commit [`168fd46`](https://github.com/apache/spark/commit/168fd46618d9e18eee03be3714a30776e6ae1463).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21131: [SPARK-23433][CORE] Late zombie task completions update ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21131
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org