You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/10/12 14:50:57 UTC

[spark] branch master updated: [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b27a287  [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on
b27a287 is described below

commit b27a287ff293c02dcad0c45cca71a5244664d7f5
Author: xuewei.linxuewei <xu...@alibaba-inc.com>
AuthorDate: Mon Oct 12 14:48:40 2020 +0000

    [SPARK-33016][SQL] Potential SQLMetrics missed which might cause WEB UI display issue while AQE is on
    
    ### What changes were proposed in this pull request?
    
    With following scenario when AQE is on, SQLMetrics could be incorrect.
    
    1. Stage A and B are created, and UI updated thru event onAdaptiveExecutionUpdate.
    2. Stage A and B are running. Subquery in stage A keep updating metrics thru event onAdaptiveSQLMetricUpdate.
    3. Stage B completes, while stage A's subquery is still running, updating metrics.
    4. Completion of stage B triggers new stage creation and UI update thru event onAdaptiveExecutionUpdate again (just like step 1).
    
    So decided to make a trade off of keeping more duplicate SQLMetrics without deleting them when AQE with newPlan updated.
    
    ### Why are the changes needed?
    
    Make SQLMetrics behavior 100% correct.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Updated SQLAppStatusListenerSuite.
    
    Closes #29965 from leanken/leanken-SPARK-33016.
    
    Authored-by: xuewei.linxuewei <xu...@alibaba-inc.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/ui/SQLAppStatusListener.scala    |  4 ++--
 .../sql/execution/ui/SQLAppStatusListenerSuite.scala     | 16 ++++++++--------
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 175340d..963aec7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -341,7 +341,7 @@ class SQLAppStatusListener(
 
     val exec = getOrCreateExecution(executionId)
     exec.physicalPlanDescription = physicalPlanDescription
-    exec.metrics = sqlPlanMetrics
+    exec.metrics ++= sqlPlanMetrics
     update(exec)
   }
 
@@ -349,7 +349,7 @@ class SQLAppStatusListener(
     val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event
 
     val exec = getOrCreateExecution(executionId)
-    exec.metrics = exec.metrics ++ sqlPlanMetrics
+    exec.metrics ++= sqlPlanMetrics
     update(exec)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index f49a3a3..00f2371 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -680,7 +680,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
     assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
   }
 
-  test("SPARK-32615: SQLMetrics validation after sparkPlanInfo updated in AQE") {
+  test("SPARK-32615,SPARK-33016: SQLMetrics validation after sparkPlanInfo updated in AQE") {
     val statusStore = createStatusStore()
     val listener = statusStore.listener.get
 
@@ -755,7 +755,7 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
         .allNodes.flatMap(_.metrics.map(_.accumulatorId))
 
     // Assume that AQE update sparkPlanInfo with newPlan
-    // ExecutionMetrics will be replaced using newPlan's SQLMetrics
+    // ExecutionMetrics will be appended using newPlan's SQLMetrics
     listener.onOtherEvent(SparkListenerSQLAdaptiveExecutionUpdate(
       executionId,
       "test",
@@ -770,8 +770,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     listener.onTaskStart(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0)))
 
-    // live metrics will be override, and ExecutionMetrics should be empty as the newPlan updated.
-    assert(statusStore.executionMetrics(executionId).isEmpty)
+    // historical metrics will be kept despite of the newPlan updated.
+    assert(statusStore.executionMetrics(executionId).size == 2)
 
     // update new metrics with Id 4 & 5, since 3 is timing metrics,
     // timing metrics has a complicated string presentation so we don't test it here.
@@ -780,9 +780,9 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
       (0L, 1, 0, createAccumulatorInfos(newMetricsValueMap))
     )))
 
-    assert(statusStore.executionMetrics(executionId).size == 2)
+    assert(statusStore.executionMetrics(executionId).size == 4)
     statusStore.executionMetrics(executionId).foreach { m =>
-      assert(m._2 == "500")
+      assert(m._2 == "100" || m._2 == "500")
     }
 
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -802,10 +802,10 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
       JobSucceeded
     ))
 
-    // aggregateMetrics should ignore metrics from job 0
+    // aggregateMetrics should contains all metrics from job 0 and job 1
     val aggregateMetrics = listener.liveExecutionMetrics(executionId)
     if (aggregateMetrics.isDefined) {
-      oldAccumulatorIds.foreach(id => assert(!aggregateMetrics.get.contains(id)))
+      assert(aggregateMetrics.get.keySet.size == 4)
     }
 
     listener.onOtherEvent(SparkListenerSQLExecutionEnd(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org