You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/14 09:02:35 UTC
[spark] branch branch-2.3 updated: Revert
"[SPARK-23433][SPARK-25250][CORE][BRANCH-2.3] Later created TaskSet should
learn about the finished partitions"
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new e3bdb5b Revert "[SPARK-23433][SPARK-25250][CORE][BRANCH-2.3] Later created TaskSet should learn about the finished partitions"
e3bdb5b is described below
commit e3bdb5b0a11f48ee850ff3b034c93fac6be8d2ad
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Sun Apr 14 17:02:02 2019 +0800
Revert "[SPARK-23433][SPARK-25250][CORE][BRANCH-2.3] Later created TaskSet should learn about the finished partitions"
This reverts commit a1ca5663725c278b6e3785042348819a25496fe4.
---
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 36 ++----------------
.../apache/spark/scheduler/TaskSetManager.scala | 19 +++-------
.../spark/scheduler/TaskSchedulerImplSuite.scala | 44 ++++++----------------
3 files changed, 20 insertions(+), 79 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ef3ce87..edf79aa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
import scala.collection.Set
-import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import org.apache.spark._
@@ -93,9 +93,6 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
val taskIdToExecutorId = new HashMap[Long, String]
- // Protected by `this`
- private[scheduler] val stageIdToFinishedPartitions = new HashMap[Int, BitSet]
-
@volatile private var hasReceivedTask = false
@volatile private var hasLaunchedTask = false
private val starvationTimer = new Timer(true)
@@ -226,20 +223,7 @@ private[spark] class TaskSchedulerImpl(
private[scheduler] def createTaskSetManager(
taskSet: TaskSet,
maxTaskFailures: Int): TaskSetManager = {
- // only create a BitSet once for a certain stage since we only remove
- // that stage when an active TaskSetManager succeed.
- stageIdToFinishedPartitions.getOrElseUpdate(taskSet.stageId, new BitSet)
- val tsm = new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
- // TaskSet got submitted by DAGScheduler may have some already completed
- // tasks since DAGScheduler does not always know all the tasks that have
- // been completed by other tasksets when completing a stage, so we mark
- // those tasks as finished here to avoid launching duplicate tasks, while
- // holding the TaskSchedulerImpl lock.
- // See SPARK-25250 and `markPartitionCompletedInAllTaskSets()`
- stageIdToFinishedPartitions.get(taskSet.stageId).foreach {
- finishedPartitions => finishedPartitions.foreach(tsm.markPartitionCompleted(_, None))
- }
- tsm
+ new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
}
override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -712,31 +696,19 @@ private[spark] class TaskSchedulerImpl(
}
/**
- * Marks the task has completed in all TaskSetManagers(active / zombie) for the given stage.
+ * Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple TaskSetManagers for the stage.
* If an earlier attempt of a stage completes a task, we should ensure that the later attempts
* do not also submit those same tasks. That also means that a task completion from an earlier
* attempt can lead to the entire stage getting marked as successful.
- * And there is also the possibility that the DAGScheduler submits another taskset at the same
- * time as we're marking a task completed here -- that taskset would have a task for a partition
- * that was already completed. We maintain the set of finished partitions in
- * stageIdToFinishedPartitions, protected by this, so we can detect those tasks when the taskset
- * is submitted. See SPARK-25250 for more details.
- *
- * note: this method must be called with a lock on this.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(
stageId: Int,
partitionId: Int,
taskInfo: TaskInfo) = {
- // if we do not find a BitSet for this stage, which means an active TaskSetManager
- // has already succeeded and removed the stage.
- stageIdToFinishedPartitions.get(stageId).foreach{
- finishedPartitions => finishedPartitions += partitionId
- }
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
- tsm.markPartitionCompleted(partitionId, Some(taskInfo))
+ tsm.markPartitionCompleted(partitionId, taskInfo)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 68b6d37..df8d914 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -21,7 +21,7 @@ import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.{ArrayBuffer, BitSet, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.math.max
import scala.util.control.NonFatal
@@ -751,11 +751,7 @@ private[spark] class TaskSetManager(
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
- // clean up finished partitions for the stage when the active TaskSetManager succeed
- if (!isZombie) {
- sched.stageIdToFinishedPartitions -= stageId
- isZombie = true
- }
+ isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
@@ -774,21 +770,16 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}
- private[scheduler] def markPartitionCompleted(
- partitionId: Int,
- taskInfo: Option[TaskInfo]): Unit = {
+ private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
partitionToIndex.get(partitionId).foreach { index =>
if (!successful(index)) {
if (speculationEnabled && !isZombie) {
- taskInfo.foreach { info => successfulTaskDurations.insert(info.duration) }
+ successfulTaskDurations.insert(taskInfo.duration)
}
tasksSuccessful += 1
successful(index) = true
if (tasksSuccessful == numTasks) {
- if (!isZombie) {
- sched.stageIdToFinishedPartitions -= stageId
- isZombie = true
- }
+ isZombie = true
}
maybeFinishTaskSet()
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 6809d91..bc9a39c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -929,7 +929,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
}
- test("SPARK-23433/25250 Completions in zombie tasksets update status of non-zombie taskset") {
+ test("Completions in zombie tasksets update status of non-zombie taskset") {
val taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
val valueSer = SparkEnv.get.serializer.newInstance()
@@ -941,9 +941,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
}
// Submit a task set, have it fail with a fetch failed, and then re-submit the task attempt,
- // two times, so we have three TaskSetManagers(2 zombie, 1 active) for one stage. (For this
- // to really happen, you'd need the previous stage to also get restarted, and then succeed,
- // in between each attempt, but that happens outside what we're mocking here.)
+ // two times, so we have three active task sets for one stage. (For this to really happen,
+ // you'd need the previous stage to also get restarted, and then succeed, in between each
+ // attempt, but that happens outside what we're mocking here.)
val zombieAttempts = (0 until 2).map { stageAttempt =>
val attempt = FakeTask.createTaskSet(10, stageAttemptId = stageAttempt)
taskScheduler.submitTasks(attempt)
@@ -960,33 +960,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(tsm.runningTasks === 9)
tsm
}
- // we've now got 2 zombie attempts, each with 9 tasks still running. And there's no active
- // attempt exists in taskScheduler by now.
-
- // finish partition 1,2 by completing the tasks before a new attempt for the same stage submit.
- // This is possible since the behaviour of submitting new attempt and handling successful task
- // is from two different threads, which are "task-result-getter" and "dag-scheduler-event-loop"
- // separately.
- (0 until 2).foreach { i =>
- completeTaskSuccessfully(zombieAttempts(i), i + 1)
- assert(taskScheduler.stageIdToFinishedPartitions(0).contains(i + 1))
- }
- // Submit the 3rd attempt still with 10 tasks, this happens due to the race between thread
- // "task-result-getter" and "dag-scheduler-event-loop", where a TaskSet gets submitted with
- // already completed tasks. And this time with insufficient resources so not all tasks are
- // active.
+ // we've now got 2 zombie attempts, each with 9 tasks still active. Submit the 3rd attempt for
+ // the stage, but this time with insufficient resources so not all tasks are active.
+
val finalAttempt = FakeTask.createTaskSet(10, stageAttemptId = 2)
taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
- // Though finalTSM gets submitted with 10 tasks, the call to taskScheduler.submitTasks should
- // realize that 2 tasks have already completed, and mark them appropriately, so it won't launch
- // any duplicate tasks later (SPARK-25250).
- (0 until 2).map(_ + 1).foreach { partitionId =>
- val index = finalTsm.partitionToIndex(partitionId)
- assert(finalTsm.successful(index))
- }
-
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
finalAttempt.tasks(task.index).partitionId
@@ -994,17 +974,16 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(finalTsm.runningTasks === 5)
assert(!finalTsm.isZombie)
- // We continually simulate late completions from our zombie tasksets(but this time, there's one
- // active attempt exists in taskScheduler), corresponding to all the pending partitions in our
- // final attempt. This means we're only waiting on the tasks we've already launched.
+ // We simulate late completions from our zombie tasksets, corresponding to all the pending
+ // partitions in our final attempt. This means we're only waiting on the tasks we've already
+ // launched.
val finalAttemptPendingPartitions = (0 until 10).toSet.diff(finalAttemptLaunchedPartitions)
finalAttemptPendingPartitions.foreach { partition =>
completeTaskSuccessfully(zombieAttempts(0), partition)
- assert(taskScheduler.stageIdToFinishedPartitions(0).contains(partition))
}
// If there is another resource offer, we shouldn't run anything. Though our final attempt
- // used to have pending tasks, now those tasks have been completed by zombie attempts. The
+ // used to have pending tasks, now those tasks have been completed by zombie attempts. The
// remaining tasks to compute are already active in the non-zombie attempt.
assert(
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)
@@ -1052,6 +1031,5 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
}
- assert(taskScheduler.stageIdToFinishedPartitions.isEmpty)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org