You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/25 12:36:36 UTC
spark git commit: [SPARK-19674][SQL] Ignore driver accumulator updates don't belong to …
Repository: spark
Updated Branches:
refs/heads/branch-2.1 92f0b012d -> d989434e4
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to \u2026
[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates
N.B. This is a backport to branch-2.1 of #17009.
## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.
## How was this patch tested?
Updated unit test.
Author: Carson Wang <carson.wangintel.com>
Author: Carson Wang <ca...@intel.com>
Closes #17418 from mallman/spark-19674-backport_2.1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d989434e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d989434e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d989434e
Branch: refs/heads/branch-2.1
Commit: d989434e4abefc1fa8907fb53ccce50b54c53b5c
Parents: 92f0b01
Author: Carson Wang <ca...@intel.com>
Authored: Sat Mar 25 20:36:15 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Mar 25 20:36:15 2017 +0800
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/execution/ui/SQLListener.scala | 7 +++++--
.../org/apache/spark/sql/execution/ui/SQLListenerSuite.scala | 5 +++++
2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d989434e/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5daf215..12d3bc9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
- }.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
+ }
val driverUpdates = executionUIData.driverAccumUpdates.toSeq
- mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
+ val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
+ case (id, _) => executionUIData.accumulatorMetrics.contains(id)
+ }
+ mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
http://git-wip-us.apache.org/repos/asf/spark/blob/d989434e/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 7b4ff67..cf86730 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
+ // Driver accumulator updates don't belong to this execution should be filtered and no
+ // exception will be thrown.
+ listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+ checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org