You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/03/09 18:56:30 UTC
spark git commit: [SPARK-19793] Use clock.getTimeMillis when mark
task as finished in TaskSetManager.
Repository: spark
Updated Branches:
refs/heads/master b60b9fc10 -> 3232e54f2
[SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager.
## What changes were proposed in this pull request?
TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct.
## How was this patch tested?
Existing tests.
Author: jinxing <ji...@126.com>
Closes #17133 from jinxing64/SPARK-19793.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3232e54f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3232e54f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3232e54f
Branch: refs/heads/master
Commit: 3232e54f2fcb8d2072cba4bc763ef29d5d8d325f
Parents: b60b9fc
Author: jinxing <ji...@126.com>
Authored: Thu Mar 9 10:56:19 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Mar 9 10:56:19 2017 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/spark/scheduler/TaskInfo.scala | 6 ++++--
.../scala/org/apache/spark/scheduler/TaskSetManager.scala | 6 +++---
.../org/apache/spark/scheduler/TaskSetManagerSuite.scala | 10 +++++++++-
.../test/scala/org/apache/spark/ui/StagePageSuite.scala | 2 +-
4 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 5968013..9843eab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,11 +70,13 @@ class TaskInfo(
var killed = false
- private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
+ private[spark] def markGettingResult(time: Long) {
gettingResultTime = time
}
- private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) {
+ private[spark] def markFinished(state: TaskState, time: Long) {
+ // finishTime should be set larger than 0, otherwise "finished" below will return false.
+ assert(time > 0)
finishTime = time
if (state == TaskState.FAILED) {
failed = true
http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 19ebaf8..11633be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -667,7 +667,7 @@ private[spark] class TaskSetManager(
*/
def handleTaskGettingResult(tid: Long): Unit = {
val info = taskInfos(tid)
- info.markGettingResult()
+ info.markGettingResult(clock.getTimeMillis())
sched.dagScheduler.taskGettingResult(info)
}
@@ -695,7 +695,7 @@ private[spark] class TaskSetManager(
def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
- info.markFinished(TaskState.FINISHED)
+ info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@@ -739,7 +739,7 @@ private[spark] class TaskSetManager(
return
}
removeRunningTask(tid)
- info.markFinished(state)
+ info.markFinished(state, clock.getTimeMillis())
val index = info.index
copiesRunning(index) -= 1
var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty
http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 2c2cda9..f36bcd8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -192,6 +192,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
+ clock.advance(1)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
assert(sched.endedTasks(0) === Success)
@@ -377,6 +378,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
+ clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -394,6 +396,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
+ clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -427,6 +430,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
val clock = new ManualClock
+ clock.advance(1)
// We don't directly use the application blacklist, but its presence triggers blacklisting
// within the taskset.
val mockListenerBus = mock(classOf[LiveListenerBus])
@@ -551,7 +555,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
Seq(TaskLocation("host1", "execB")),
Seq(TaskLocation("host2", "execC")),
Seq())
- val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
+ val clock = new ManualClock()
+ clock.advance(1)
+ val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
sched.addExecutor("execA", "host1")
manager.executorAdded()
sched.addExecutor("execC", "host2")
@@ -904,6 +910,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(task.executorId === k)
}
assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+ clock.advance(1)
// Complete the 3 tasks and leave 1 task in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
@@ -961,6 +968,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
tasks += task
}
assert(sched.startedTasks.toSet === (0 until 5).toSet)
+ clock.advance(1)
// Complete 3 tasks and leave 2 tasks in running
for (id <- Set(0, 1, 2)) {
manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 11482d1..38030e0 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
- taskInfo.markFinished(TaskState.FINISHED)
+ taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
jobListener.onTaskEnd(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org