You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/14 09:01:43 UTC

[spark] branch branch-2.4 updated: Revert "[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions"

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ede02b6  Revert "[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions"
ede02b6 is described below

commit ede02b692fcf9a7317e7a8e2841b7fe1c901218b
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun Apr 14 17:01:02 2019 +0800

    Revert "[SPARK-23433][SPARK-25250][CORE] Later created TaskSet should learn about the finished partitions"
    
    This reverts commit db86ccb11821231d85b727fb889dec1d58b39e4d.
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 36 ++----------------
 .../apache/spark/scheduler/TaskSetManager.scala    | 19 +++-------
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 44 ++++++----------------
 3 files changed, 20 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 382a417..e194b79 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.Set
-import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
 import org.apache.spark._
@@ -94,9 +94,6 @@ private[spark] class TaskSchedulerImpl(
   private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
   val taskIdToExecutorId = new HashMap[Long, String]
 
-  // Protected by `this`
-  private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
-
   @volatile private var hasReceivedTask = false
   @volatile private var hasLaunchedTask = false
   private val starvationTimer = new Timer(true)
@@ -245,20 +242,7 @@ private[spark] class TaskSchedulerImpl(
   private[scheduler] def createTaskSetManager(
       taskSet: TaskSet,
       maxTaskFailures: Int): TaskSetManager = {
-    // only create a BitSet once for a certain stage since we only remove
-    // that stage when an active TaskSetManager succeed.
-    stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
-    val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
-    // TaskSet got submitted by DAGScheduler may have some already completed
-    // tasks since DAGScheduler does not always know all the tasks that have
-    // been completed by other tasksets when completing a stage, so we mark
-    // those tasks as finished here to avoid launching duplicate tasks, while
-    // holding the TaskSchedulerImpl lock.
-    // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
-    stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
-      finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
-    }
-    tsm
+    new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
   }
 
   override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -855,31 +839,19 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /**
-   * Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
+   * Marks the task has completed in all TaskSetManagers for the given stage.
    *
    * After stage failure and retry, there may be multiple TaskSetManagers for the stage.
    * If an earlier attempt of a stage completes a task, we should ensure that the later attempts
    * do not also submit those same tasks.  That also means that a task completion from an earlier
    * attempt can lead to the entire stage getting marked as successful.
-   * And there is also the possibility that the DAGScheduler submits another taskset at the same
-   * time as we're marking a task completed here -- that taskset would have a task for a partition
-   * that was already completed. We maintain the set of finished partitions in
-   * stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
-   * is submitted. See SPARK-25250 for more details.
-   *
-   * note: this method must be called with a lock on this.
    */
   private[scheduler] def markPartitionCompletedInAllTaskSets(
       stageId: Int,
       partitionId: Int,
       taskInfo: TaskInfo) = {
-    // if we do not find a BitSet for this stage, which means an active TaskSetManager
-    // has already succeeded and removed the stage.
-    stageIdToFinishedPartitions.get(stageId).foreach{
-      finishedPartitions => finishedPartitions += partitionId
-    }
     taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
-      tsm.markPartitionCompleted(partitionId, Some(taskInfo))
+      tsm.markPartitionCompleted(partitionId, taskInfo)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 71940ee..6bf60dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
 import java.nio.ByteBuffer
 import java.util.concurrent.ConcurrentLinkedQueue
 
-import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.math.max
 import scala.util.control.NonFatal
 
@@ -777,11 +777,7 @@ private[spark] class TaskSetManager(
       // Mark successful and stop if all the tasks have succeeded.
       successful(index) = true
       if (tasksSuccessful == numTasks) {
-        // clean up finished partitions for the stage when the active TaskSetManager succeed
-        if (!isZombie) {
-          sched.stageIdToFinishedPartitions -= stageId
-          isZombie = true
-        }
+        isZombie = true
       }
     } else {
       logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -800,21 +796,16 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(
-      partitionId: Int,
-      taskInfo: Option[TaskInfo]): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
     partitionToIndex.get(partitionId).foreach { index =>
       if (!successful(index)) {
         if (speculationEnabled && !isZombie) {
-          taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
+          successfulTaskDurations.insert(taskInfo.duration)
         }
         tasksSuccessful += 1
         successful(index) = true
         if (tasksSuccessful == numTasks) {
-          if (!isZombie) {
-            sched.stageIdToFinishedPartitions -= stageId
-            isZombie = true
-          }
+          isZombie = true
         }
         maybeFinishTaskSet()
       }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 006b3e5..8b60f8a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1113,7 +1113,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     }
   }
 
-  test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
+  test("Completions in zombie tasksets update status of non-zombie taskset") {
     val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
     val valueSer = SparkEnv.get.serializer.newInstance()
 
@@ -1125,9 +1125,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     }
 
     // Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
-    // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage.  (For this
-    // to really happen, you'd need the previous stage to also get restarted, and then succeed,
-    // in between each attempt, but that happens outside what we're mocking here.)
+    // two times, so we have three active task sets for one stage.  (For this to really happen,
+    // you'd need the previous stage to also get restarted, and then succeed, in between each
+    // attempt, but that happens outside what we're mocking here.)
     val zombieAttempts = (0 until 2).map { stageAttempt =>
       val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
       taskScheduler.submitTasks(attempt)
@@ -1144,33 +1144,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       assert(tsm.runningTasks === 9)
       tsm
     }
-    // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
-    // attempt exists in taskScheduler by now.
-
-    // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
-    // This is possible since the behaviour of submitting new attempt and handling successful task
-    // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
-    // separately.
-    (0 until 2).foreach { i =>
-      completeTaskSuccessfully(zombieAttempts(i), i + 1)
-      assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
-    }
 
-    // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
-    // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
-    // already completed tasks. And this time with insufficient resources so not all tasks are
-    // active.
+    // we've now got 2 zombie attempts, each with 9 tasks still active.  Submit the 3rd attempt for
+    // the stage, but this time with insufficient resources so not all tasks are active.
+
     val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
     taskScheduler.submitTasks(finalAttempt)
     val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
-    // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
-    // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
-    // any duplicate tasks later (SPARK-25250).
-    (0 until 2).map(_ + 1).foreach { partitionId =>
-      val index = finalTsm.partitionToIndex(partitionId)
-      assert(finalTsm.successful(index))
-    }
-
     val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
     val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
       finalAttempt.tasks(task.index).partitionId
@@ -1178,17 +1158,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(finalTsm.runningTasks === 5)
     assert(!finalTsm.isZombie)
 
-    // We continually simulate late completions from our zombie tasksets(but this time, there's one
-    // active attempt exists in taskScheduler), corresponding to all the pending partitions in our
-    // final attempt. This means we're only waiting on the tasks we've already launched.
+    // We simulate late completions from our zombie tasksets, corresponding to all the pending
+    // partitions in our final attempt.  This means we're only waiting on the tasks we've already
+    // launched.
     val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
     finalAttemptPendingPartitions.foreach { partition =>
       completeTaskSuccessfully(zombieAttempts(0), partition)
-      assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
     }
 
     // If there is another resource offer, we shouldn't run anything.  Though our final attempt
-    // used to have pending tasks, now those tasks have been completed by zombie attempts. The
+    // used to have pending tasks, now those tasks have been completed by zombie attempts.  The
     // remaining tasks to compute are already active in the non-zombie attempt.
     assert(
       taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1236,7 +1215,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       // perspective, as the failures weren't from a problem w/ the tasks themselves.
       verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
     }
-    assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
   }
 
   test("don't schedule for a barrier taskSet if available slots are less than pending tasks") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org