You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2020/05/08 17:36:54 UTC

[spark] branch branch-3.0 updated: [SPARK-31658][SQL] Fix SQL UI not showing write commands of AQE plan

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ba43922  [SPARK-31658][SQL] Fix SQL UI not showing write commands of AQE plan
ba43922 is described below

commit ba4392217b461d20bfd10dbc00714dbb7268d71a
Author: manuzhang <ow...@gmail.com>
AuthorDate: Fri May 8 10:24:13 2020 -0700

    [SPARK-31658][SQL] Fix SQL UI not showing write commands of AQE plan
    
    Show write commands on SQL UI of an AQE plan
    
    Currently the leaf node of an AQE plan is always a `AdaptiveSparkPlan` which is not true when it's a child of a write command. Hence, the node of the write command as well as its metrics are not shown on the SQL UI.
    
    ![image](https://user-images.githubusercontent.com/1191767/81288918-1893f580-9098-11ea-9771-e3d0820ba806.png)
    
    ![image](https://user-images.githubusercontent.com/1191767/81289008-3a8d7800-9098-11ea-93ec-516bbaf25d2d.png)
    
    No
    
    Add UT.
    
    Closes #28474 from manuzhang/aqe-ui.
    
    Lead-authored-by: manuzhang <ow...@gmail.com>
    Co-authored-by: Xiao Li <ga...@gmail.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
    (cherry picked from commit 77c690a7252b22c9dd8f3cb7ac32f79fd6845cad)
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  4 +--
 .../adaptive/AdaptiveQueryExecSuite.scala          | 35 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index cd6936b..90d1db9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -526,8 +526,8 @@ case class AdaptiveSparkPlanExec(
     } else {
       context.session.sparkContext.listenerBus.post(SparkListenerSQLAdaptiveExecutionUpdate(
         executionId,
-        SQLExecution.getQueryExecution(executionId).toString,
-        SparkPlanInfo.fromSparkPlan(this)))
+        context.qe.toString,
+        SparkPlanInfo.fromSparkPlan(context.qe.executedPlan)))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index f30d1e9..29b9755 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -805,9 +805,11 @@ class AdaptiveQueryExecSuite
   test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
-      val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan
-      assert(plan.isInstanceOf[DataWritingCommandExec])
-      assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+      withTable("t1") {
+        val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan
+        assert(plan.isInstanceOf[DataWritingCommandExec])
+        assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+      }
     }
   }
 
@@ -847,4 +849,31 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-31658: SQL UI should show write commands") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      withTable("t1") {
+        var checkDone = false
+        val listener = new SparkListener {
+          override def onOtherEvent(event: SparkListenerEvent): Unit = {
+            event match {
+              case SparkListenerSQLAdaptiveExecutionUpdate(_, _, planInfo) =>
+                assert(planInfo.nodeName == "Execute CreateDataSourceTableAsSelectCommand")
+                checkDone = true
+              case _ => // ignore other events
+            }
+          }
+        }
+        spark.sparkContext.addSparkListener(listener)
+        try {
+          sql("CREATE TABLE t1 AS SELECT 1 col").collect()
+          spark.sparkContext.listenerBus.waitUntilEmpty()
+          assert(checkDone)
+        } finally {
+          spark.sparkContext.removeSparkListener(listener)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org