You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/04/23 22:20:06 UTC

spark git commit: [SPARK-7058] Include RDD deserialization time in "task deserialization time" metric

Repository: spark
Updated Branches:
  refs/heads/master c1213e6a9 -> 6afde2c78


[SPARK-7058] Include RDD deserialization time in "task deserialization time" metric

The web UI's "task deserialization time" metric is slightly misleading because it does not capture the time taken to deserialize the broadcasted RDD.

Author: Josh Rosen <jo...@databricks.com>

Closes #5635 from JoshRosen/SPARK-7058 and squashes the following commits:

ed90f75 [Josh Rosen] Update UI tooltip
a3743b4 [Josh Rosen] Update comments.
4f52910 [Josh Rosen] Roll back whitespace change
e9cf9f4 [Josh Rosen] Remove unused variable
9f32e55 [Josh Rosen] Expose executorDeserializeTime on Task instead of pushing runtime calculation into Task.
21f5b47 [Josh Rosen] Don't double-count the broadcast deserialization time in task runtime
1752f0e [Josh Rosen] [SPARK-7058] Incorporate RDD deserialization time in task deserialization time metric


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6afde2c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6afde2c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6afde2c7

Branch: refs/heads/master
Commit: 6afde2c7810c363083d0a699b1de02b54c13e6a9
Parents: c1213e6
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Apr 23 13:19:03 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Apr 23 13:19:03 2015 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 8 ++++++--
 .../main/scala/org/apache/spark/scheduler/ResultTask.scala   | 2 ++
 .../scala/org/apache/spark/scheduler/ShuffleMapTask.scala    | 2 ++
 core/src/main/scala/org/apache/spark/scheduler/Task.scala    | 7 +++++++
 core/src/main/scala/org/apache/spark/ui/ToolTips.scala       | 4 +++-
 5 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6afde2c7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5fc04df..f57e215 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -220,8 +220,12 @@ private[spark] class Executor(
         val afterSerialization = System.currentTimeMillis()
 
         for (m <- task.metrics) {
-          m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
-          m.setExecutorRunTime(taskFinish - taskStart)
+          // Deserialization happens in two parts: first, we deserialize a Task object, which
+          // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
+          m.setExecutorDeserializeTime(
+            (taskStart - deserializeStartTime) + task.executorDeserializeTime)
+          // We need to subtract Task.run()'s deserialization time to avoid double-counting
+          m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
           m.setJvmGCTime(computeTotalGcTime() - startGCTime)
           m.setResultSerializationTime(afterSerialization - beforeSerialization)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/6afde2c7/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index e074ce6..c9a1241 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -53,9 +53,11 @@ private[spark] class ResultTask[T, U](
 
   override def runTask(context: TaskContext): U = {
     // Deserialize the RDD and the func using the broadcast variables.
+    val deserializeStartTime = System.currentTimeMillis()
     val ser = SparkEnv.get.closureSerializer.newInstance()
     val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 
     metrics = Some(context.taskMetrics)
     func(context, rdd.iterator(partition, context))

http://git-wip-us.apache.org/repos/asf/spark/blob/6afde2c7/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 6c7d000..bd3dd23 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -56,9 +56,11 @@ private[spark] class ShuffleMapTask(
 
   override def runTask(context: TaskContext): MapStatus = {
     // Deserialize the RDD using the broadcast variable.
+    val deserializeStartTime = System.currentTimeMillis()
     val ser = SparkEnv.get.closureSerializer.newInstance()
     val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
+    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
 
     metrics = Some(context.taskMetrics)
     var writer: ShuffleWriter[Any, Any] = null

http://git-wip-us.apache.org/repos/asf/spark/blob/6afde2c7/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 8b59286..b09b19e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -87,12 +87,19 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
   // initialized when kill() is invoked.
   @volatile @transient private var _killed = false
 
+  protected var _executorDeserializeTime: Long = 0
+
   /**
    * Whether the task has been killed.
    */
   def killed: Boolean = _killed
 
   /**
+   * Returns the amount of time spent deserializing the RDD and function to be run.
+   */
+  def executorDeserializeTime: Long = _executorDeserializeTime
+
+  /**
    * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
    * code and user code to properly handle the flag. This function should be idempotent so it can
    * be called multiple times.

http://git-wip-us.apache.org/repos/asf/spark/blob/6afde2c7/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index cae6870..24f3236 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -24,7 +24,9 @@ private[spark] object ToolTips {
        scheduler delay is large, consider decreasing the size of tasks or decreasing the size
        of task results."""
 
-  val TASK_DESERIALIZATION_TIME = "Time spent deserializing the task closure on the executor."
+  val TASK_DESERIALIZATION_TIME =
+    """Time spent deserializing the task closure on the executor, including the time to read the
+       broadcasted task."""
 
   val SHUFFLE_READ_BLOCKED_TIME =
     "Time that the task spent blocked waiting for shuffle data to be read from remote machines."


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org