You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/03/09 18:56:30 UTC

spark git commit: [SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager.

Repository: spark
Updated Branches:
  refs/heads/master b60b9fc10 -> 3232e54f2


[SPARK-19793] Use clock.getTimeMillis when mark task as finished in TaskSetManager.

## What changes were proposed in this pull request?

TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set by `clock`), the result is not correct.

## How was this patch tested?
Existing tests.

Author: jinxing <ji...@126.com>

Closes #17133 from jinxing64/SPARK-19793.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3232e54f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3232e54f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3232e54f

Branch: refs/heads/master
Commit: 3232e54f2fcb8d2072cba4bc763ef29d5d8d325f
Parents: b60b9fc
Author: jinxing <ji...@126.com>
Authored: Thu Mar 9 10:56:19 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Mar 9 10:56:19 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/TaskInfo.scala  |  6 ++++--
 .../scala/org/apache/spark/scheduler/TaskSetManager.scala |  6 +++---
 .../org/apache/spark/scheduler/TaskSetManagerSuite.scala  | 10 +++++++++-
 .../test/scala/org/apache/spark/ui/StagePageSuite.scala   |  2 +-
 4 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 5968013..9843eab 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -70,11 +70,13 @@ class TaskInfo(
 
   var killed = false
 
-  private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
+  private[spark] def markGettingResult(time: Long) {
     gettingResultTime = time
   }
 
-  private[spark] def markFinished(state: TaskState, time: Long = System.currentTimeMillis) {
+  private[spark] def markFinished(state: TaskState, time: Long) {
+    // finishTime should be set larger than 0, otherwise "finished" below will return false.
+    assert(time > 0)
     finishTime = time
     if (state == TaskState.FAILED) {
       failed = true

http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 19ebaf8..11633be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -667,7 +667,7 @@ private[spark] class TaskSetManager(
    */
   def handleTaskGettingResult(tid: Long): Unit = {
     val info = taskInfos(tid)
-    info.markGettingResult()
+    info.markGettingResult(clock.getTimeMillis())
     sched.dagScheduler.taskGettingResult(info)
   }
 
@@ -695,7 +695,7 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
     val info = taskInfos(tid)
     val index = info.index
-    info.markFinished(TaskState.FINISHED)
+    info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     removeRunningTask(tid)
     // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
     // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
@@ -739,7 +739,7 @@ private[spark] class TaskSetManager(
       return
     }
     removeRunningTask(tid)
-    info.markFinished(state)
+    info.markFinished(state, clock.getTimeMillis())
     val index = info.index
     copiesRunning(index) -= 1
     var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 2c2cda9..f36bcd8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -192,6 +192,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
     assert(taskOption.isDefined)
 
+    clock.advance(1)
     // Tell it the task has finished
     manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
     assert(sched.endedTasks(0) === Success)
@@ -377,6 +378,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
+    clock.advance(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -394,6 +396,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
+    clock.advance(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
@@ -427,6 +430,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // affinity to exec1 on host1 - which we will fail.
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
     val clock = new ManualClock
+    clock.advance(1)
     // We don't directly use the application blacklist, but its presence triggers blacklisting
     // within the taskset.
     val mockListenerBus = mock(classOf[LiveListenerBus])
@@ -551,7 +555,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host1", "execB")),
       Seq(TaskLocation("host2", "execC")),
       Seq())
-    val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
+    val clock = new ManualClock()
+    clock.advance(1)
+    val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
     sched.addExecutor("execA", "host1")
     manager.executorAdded()
     sched.addExecutor("execC", "host2")
@@ -904,6 +910,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       assert(task.executorId === k)
     }
     assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
     // Complete the 3 tasks and leave 1 task in running
     for (id <- Set(0, 1, 2)) {
       manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
@@ -961,6 +968,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       tasks += task
     }
     assert(sched.startedTasks.toSet === (0 until 5).toSet)
+    clock.advance(1)
     // Complete 3 tasks and leave 2 tasks in running
     for (id <- Set(0, 1, 2)) {
       manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))

http://git-wip-us.apache.org/repos/asf/spark/blob/3232e54f/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 11482d1..38030e0 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -77,7 +77,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
         val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
         jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
         jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
-        taskInfo.markFinished(TaskState.FINISHED)
+        taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
         val taskMetrics = TaskMetrics.empty
         taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
         jobListener.onTaskEnd(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org