You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/12 03:53:34 UTC

git commit: [SPARK-3465] fix task metrics aggregation in local mode

Repository: spark
Updated Branches:
  refs/heads/master 33c7a738a -> 42904b8d0


[SPARK-3465] fix task metrics aggregation in local mode

Before overwrite t.taskMetrics, take a deepcopy of it.

Author: Davies Liu <da...@gmail.com>

Closes #2338 from davies/fix_metric and squashes the following commits:

a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42904b8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42904b8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42904b8d

Branch: refs/heads/master
Commit: 42904b8d013e71d03e301c3da62e33b4cc2eb54e
Parents: 33c7a73
Author: Davies Liu <da...@gmail.com>
Authored: Thu Sep 11 18:53:26 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Sep 11 18:53:26 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/executor/Executor.scala  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42904b8d/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd903dc..acae448 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -360,7 +360,16 @@ private[spark] class Executor(
             if (!taskRunner.attemptedTask.isEmpty) {
               Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                 metrics.updateShuffleReadMetrics
-                tasksMetrics += ((taskRunner.taskId, metrics))
+                if (isLocal) {
+                  // JobProgressListener will hold an reference of it during
+                  // onExecutorMetricsUpdate(), then JobProgressListener can not see
+                  // the changes of metrics any more, so make a deep copy of it
+                  val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+                  tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+                } else {
+                  // It will be copied by serialization
+                  tasksMetrics += ((taskRunner.taskId, metrics))
+                }
               }
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org