You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2016/10/24 22:34:33 UTC
spark git commit: [SPARK-17894][CORE] Ensure uniqueness of
TaskSetManager name.
Repository: spark
Updated Branches:
refs/heads/master 4ecbe1b92 -> 81d6933e7
[SPARK-17894][CORE] Ensure uniqueness of TaskSetManager name.
`TaskSetManager` should have unique name to avoid adding duplicate ones to parent `Pool` via `SchedulableBuilder`. This problem has been surfaced with following discussion: [[PR: Avoid adding duplicate schedulables]](https://github.com/apache/spark/pull/15326)
**Proposal** :
There is 1x1 relationship between `stageAttemptId` and `TaskSetManager` so `taskSet.Id` covering both `stageId` and `stageAttemptId` looks to be used for uniqueness of `TaskSetManager` name instead of just `stageId`.
**Current TaskSetManager Name** :
`var name = "TaskSet_" + taskSet.stageId.toString`
**Sample**: TaskSet_0
**Proposed TaskSetManager Name** :
`val name = "TaskSet_" + taskSet.Id ` `// taskSet.Id = (stageId + "." + stageAttemptId)`
**Sample** : TaskSet_0.0
Added new Unit Test.
Author: erenavsarogullari <er...@gmail.com>
Closes #15463 from erenavsarogullari/SPARK-17894.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81d6933e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81d6933e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81d6933e
Branch: refs/heads/master
Commit: 81d6933e75579343b1dd14792c18149e97e92cdd
Parents: 4ecbe1b
Author: Eren Avsarogullari <er...@gmail.com>
Authored: Mon Oct 24 15:33:02 2016 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Oct 24 15:33:54 2016 -0700
----------------------------------------------------------------------
.../apache/spark/scheduler/TaskSetManager.scala | 2 +-
.../org/apache/spark/scheduler/FakeTask.scala | 13 +++++++++----
.../spark/scheduler/TaskSetManagerSuite.scala | 20 +++++++++++++++++++-
3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/81d6933e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9491bc7..b766e41 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -79,7 +79,7 @@ private[spark] class TaskSetManager(
var minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_" + taskSet.stageId.toString
+ val name = "TaskSet_" + taskSet.id
var parent: Pool = null
var totalResultSize = 0L
var calculatedTasks = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/81d6933e/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 87600fe..f395fe9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -22,7 +22,7 @@ import org.apache.spark.TaskContext
class FakeTask(
stageId: Int,
partitionId: Int,
- prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
+ prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, stageAttemptId = 0, partitionId) {
override def runTask(context: TaskContext): Int = 0
override def preferredLocations: Seq[TaskLocation] = prefLocs
}
@@ -33,16 +33,21 @@ object FakeTask {
* locations for each task (given as varargs) if this sequence is not empty.
*/
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
- createTaskSet(numTasks, 0, prefLocs: _*)
+ createTaskSet(numTasks, stageAttemptId = 0, prefLocs: _*)
}
def createTaskSet(numTasks: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+ createTaskSet(numTasks, stageId = 0, stageAttemptId, prefLocs: _*)
+ }
+
+ def createTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*):
+ TaskSet = {
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
throw new IllegalArgumentException("Wrong number of task locations")
}
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
- new FakeTask(0, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
}
- new TaskSet(tasks, 0, stageAttemptId, 0, null)
+ new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/81d6933e/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 69edcf3..b49ba08 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -904,7 +904,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
task.index == index && !sched.endedTasks.contains(task.taskId)
}.getOrElse {
throw new RuntimeException(s"couldn't find index $index in " +
- s"tasks: ${tasks.map{t => t.index -> t.taskId}} with endedTasks:" +
+ s"tasks: ${tasks.map { t => t.index -> t.taskId }} with endedTasks:" +
s" ${sched.endedTasks.keys}")
}
}
@@ -974,6 +974,24 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(manager.isZombie)
}
+ test("SPARK-17894: Verify TaskSetManagers for different stage attempts have unique names") {
+ sc = new SparkContext("local", "test")
+ sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager.name === "TaskSet_0.0")
+
+ // Make sure a task set with the same stage ID but different attempt ID has a unique name
+ val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
+ val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager2.name === "TaskSet_0.1")
+
+ // Make sure a task set with the same attempt ID but different stage ID also has a unique name
+ val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
+ val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
+ assert(manager3.name === "TaskSet_1.1")
+ }
+
private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org