You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/12 03:53:34 UTC
git commit: [SPARK-3465] fix task metrics aggregation in local mode
Repository: spark
Updated Branches:
refs/heads/master 33c7a738a -> 42904b8d0
[SPARK-3465] fix task metrics aggregation in local mode
Before overwrite t.taskMetrics, take a deepcopy of it.
Author: Davies Liu <da...@gmail.com>
Closes #2338 from davies/fix_metric and squashes the following commits:
a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42904b8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42904b8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42904b8d
Branch: refs/heads/master
Commit: 42904b8d013e71d03e301c3da62e33b4cc2eb54e
Parents: 33c7a73
Author: Davies Liu <da...@gmail.com>
Authored: Thu Sep 11 18:53:26 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Sep 11 18:53:26 2014 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/executor/Executor.scala | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/42904b8d/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd903dc..acae448 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -360,7 +360,16 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
- tasksMetrics += ((taskRunner.taskId, metrics))
+ if (isLocal) {
+ // JobProgressListener will hold an reference of it during
+ // onExecutorMetricsUpdate(), then JobProgressListener can not see
+ // the changes of metrics any more, so make a deep copy of it
+ val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+ tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+ } else {
+ // It will be copied by serialization
+ tasksMetrics += ((taskRunner.taskId, metrics))
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org