You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/10/12 14:50:57 UTC
[spark] branch master updated: [SPARK-33016][SQL] Potential
SQLMetrics missed which might cause WEB UI display issue while AQE is on
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b27a287 [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on
b27a287 is described below
commit b27a287ff293c02dcad0c45cca71a5244664d7f5
Author: xuewei.linxuewei <xu...@alibaba-inc.com>
AuthorDate: Mon Oct 12 14:48:40 2020 +0000
[SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on
### What changes were proposed in this pull request?
With following scenario when AQE is on, SQLMetrics could be incorrect.
1. Stage A and B are created, and UI updated thru event onAdaptiveExecutionUpdate.
2. Stage A and B are running. Subquery in stage A keep updating metrics thru event onAdaptiveSQLMetricUpdate.
3. Stage B completes, while stage A's subquery is still running, updating metrics.
4. Completion of stage B triggers new stage creation and UI update thru event onAdaptiveExecutionUpdate again (just like step 1).
So decided to make a trade off of keeping more duplicate SQLMetrics without deleting them when AQE with newPlan updated.
### Why are the changes needed?
Make SQLMetrics behavior 100% correct.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated SQLAppStatusListenerSuite.
Closes #29965 from leanken/leanken-SPARK-33016.
Authored-by: xuewei.linxuewei <xu...@alibaba-inc.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/ui/SQLAppStatusListener.scala | 4 ++--
.../sql/execution/ui/SQLAppStatusListenerSuite.scala | 16 ++++++++--------
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 175340d..963aec7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -341,7 +341,7 @@ class SQLAppStatusListener(
val exec = getOrCreateExecution(executionId)
exec.physicalPlanDescription = physicalPlanDescription
- exec.metrics = sqlPlanMetrics
+ exec.metrics ++= sqlPlanMetrics
update(exec)
}
@@ -349,7 +349,7 @@ class SQLAppStatusListener(
val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event
val exec = getOrCreateExecution(executionId)
- exec.metrics = exec.metrics ++ sqlPlanMetrics
+ exec.metrics ++= sqlPlanMetrics
update(exec)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index f49a3a3..00f2371 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -680,7 +680,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
}
- test("SPARK-32615: SQLMetrics validation after sparkPlanInfo updated in AQE") {
+ test("SPARK-32615,SPARK-33016: SQLMetrics validation after sparkPlanInfo updated in AQE") {
val statusStore = createStatusStore()
val listener = statusStore.listener.get
@@ -755,7 +755,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
.allNodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume that AQE update sparkPlanInfo with newPlan
- // ExecutionMetrics will be replaced using newPlan's SQLMetrics
+ // ExecutionMetrics will be appended using newPlan's SQLMetrics
listener.onOtherEvent(SparkListenerSQLAdaptiveExecutionUpdate(
executionId,
"test",
@@ -770,8 +770,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
listener.onTaskStart(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0)))
- // live metrics will be override, and ExecutionMetrics should be empty as the newPlan updated.
- assert(statusStore.executionMetrics(executionId).isEmpty)
+ // historical metrics will be kept despite of the newPlan updated.
+ assert(statusStore.executionMetrics(executionId).size == 2)
// update new metrics with Id 4 & 5, since 3 is timing metrics,
// timing metrics has a complicated string presentation so we don't test it here.
@@ -780,9 +780,9 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
(0L, 1, 0, createAccumulatorInfos(newMetricsValueMap))
)))
- assert(statusStore.executionMetrics(executionId).size == 2)
+ assert(statusStore.executionMetrics(executionId).size == 4)
statusStore.executionMetrics(executionId).foreach { m =>
- assert(m._2 == "500")
+ assert(m._2 == "100" || m._2 == "500")
}
listener.onTaskEnd(SparkListenerTaskEnd(
@@ -802,10 +802,10 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
JobSucceeded
))
- // aggregateMetrics should ignore metrics from job 0
+ // aggregateMetrics should contains all metrics from job 0 and job 1
val aggregateMetrics = listener.liveExecutionMetrics(executionId)
if (aggregateMetrics.isDefined) {
- oldAccumulatorIds.foreach(id => assert(!aggregateMetrics.get.contains(id)))
+ assert(aggregateMetrics.get.keySet.size == 4)
}
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org