You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/14 16:07:19 UTC

[spark] branch branch-3.4 updated: [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 24cdae8f3dc [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution
24cdae8f3dc is described below

commit 24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Tue Mar 14 09:06:53 2023 -0700

    [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/39268 / [SPARK-41752](https://issues.apache.org/jira/browse/SPARK-41752) added a new non-optional `rootExecutionId: Long` field to the SparkListenerSQLExecutionStart case class.
    
    When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the rootExecutionField to be initialized with a default value of 0.
    
    The value 0 is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root.
    
    Thanks JoshRosen for reporting and investigating this issue.
    
    ### Why are the changes needed?
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #40403 from linhongliu-db/fix-nested-execution.
    
    Authored-by: Linhong Liu <li...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 4db8e7b7944302a3929dd6a1197ea1385eecc46a)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/sql/execution/SQLExecution.scala  |  2 +-
 .../spark/sql/execution/ui/AllExecutionsPage.scala |  2 +-
 .../sql/execution/ui/SQLAppStatusListener.scala    |  2 +-
 .../spark/sql/execution/ui/SQLListener.scala       |  5 +-
 .../spark/sql/execution/SQLJsonProtocolSuite.scala | 82 ++++++++++++----------
 .../history/SQLEventFilterBuilderSuite.scala       |  2 +-
 .../history/SQLLiveEntitiesEventFilterSuite.scala  |  8 +--
 .../sql/execution/ui/AllExecutionsPageSuite.scala  | 47 +++++++++++--
 .../execution/ui/MetricsAggregationBenchmark.scala |  2 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 20 +++---
 10 files changed, 111 insertions(+), 61 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 56fc9d946df..eeca1669e74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -106,7 +106,7 @@ object SQLExecution {
         try {
           sc.listenerBus.post(SparkListenerSQLExecutionStart(
             executionId = executionId,
-            rootExecutionId = rootExecutionId,
+            rootExecutionId = Some(rootExecutionId),
             description = desc,
             details = callSite.longForm,
             physicalPlanDescription = queryExecution.explainString(planDescriptionMode),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index cd8f31b3c21..058ecbbb1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -260,7 +260,7 @@ private[ui] class ExecutionPagedTable(
   private val parameterPath =
     s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}"
 
-  private val showSubExecutions = subExecutions.nonEmpty
+  private val showSubExecutions = subExecutions.exists(_._2.nonEmpty)
 
   override def tableId: String = s"$executionTag-table"
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 32b215b1c2e..7b9f877bdef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -356,7 +356,7 @@ class SQLAppStatusListener(
     kvstore.write(graphToStore)
 
     val exec = getOrCreateExecution(executionId)
-    exec.rootExecutionId = rootExecutionId
+    exec.rootExecutionId = rootExecutionId.getOrElse(executionId)
     exec.description = description
     exec.details = details
     exec.physicalPlanDescription = physicalPlanDescription
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b931b4fcde1..d4c8f600a4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -44,7 +44,10 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates(
 case class SparkListenerSQLExecutionStart(
     executionId: Long,
     // if the execution is a root, then rootExecutionId == executionId
-    rootExecutionId: Long,
+    // if the event is parsed from the event log that generated by Spark not support
+    // nested execution, then rootExecutionId = None
+    @JsonDeserialize(contentAs = classOf[java.lang.Long])
+    rootExecutionId: Option[Long],
     description: String,
     details: String,
     physicalPlanDescription: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
index e9d98ee9715..49cca666d1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
@@ -30,43 +30,53 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
 
   test("SparkPlanGraph backward compatibility: metadata") {
     Seq(true, false).foreach { newExecutionStartEvent =>
-      val event = if (newExecutionStartEvent) {
-        "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart"
-      } else {
-        "org.apache.spark.sql.execution.OldVersionSQLExecutionStart"
-      }
-      val SQLExecutionStartJsonString =
-        s"""
-          |{
-          |  "Event":"$event",
-          |  "executionId":0,
-          |  "description":"test desc",
-          |  "details":"test detail",
-          |  "physicalPlanDescription":"test plan",
-          |  "sparkPlanInfo": {
-          |    "nodeName":"TestNode",
-          |    "simpleString":"test string",
-          |    "children":[],
-          |    "metadata":{},
-          |    "metrics":[]
-          |  },
-          |  "time":0,
-          |  "modifiedConfigs": {
-          |    "k1":"v1"
-          |  }
-          |}
-      """.stripMargin
+      Seq(true, false).foreach { newExecutionStartJson =>
+        val event = if (newExecutionStartEvent) {
+          "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart"
+        } else {
+          "org.apache.spark.sql.execution.OldVersionSQLExecutionStart"
+        }
+
+        val SQLExecutionStartJsonString =
+          s"""
+             |{
+             |  "Event":"$event",
+             |  ${if (newExecutionStartJson) """"rootExecutionId": "1",""" else ""}
+             |  "executionId":0,
+             |  "description":"test desc",
+             |  "details":"test detail",
+             |  "physicalPlanDescription":"test plan",
+             |  "sparkPlanInfo": {
+             |    "nodeName":"TestNode",
+             |    "simpleString":"test string",
+             |    "children":[],
+             |    "metadata":{},
+             |    "metrics":[]
+             |  },
+             |  "time":0,
+             |  "modifiedConfigs": {
+             |    "k1":"v1"
+             |  }
+             |}
+          """.stripMargin
 
-      val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
-      if (newExecutionStartEvent) {
-        val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail",
-          "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
-          Map("k1" -> "v1"))
-        assert(reconstructedEvent == expectedEvent)
-      } else {
-        val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail",
-          "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
-        assert(reconstructedEvent == expectedOldEvent)
+        val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
+        if (newExecutionStartEvent) {
+          val expectedEvent = if (newExecutionStartJson) {
+            SparkListenerSQLExecutionStart(0, Some(1), "test desc", "test detail",
+              "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
+              Map("k1" -> "v1"))
+          } else {
+            SparkListenerSQLExecutionStart(0, None, "test desc", "test detail",
+              "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
+              Map("k1" -> "v1"))
+          }
+          assert(reconstructedEvent == expectedEvent)
+        } else {
+          val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail",
+            "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
+          assert(reconstructedEvent == expectedOldEvent)
+        }
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
index 42b27bd9f28..87ac58dbc3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
@@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite {
     }
 
     // Start SQL Execution
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan",
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(1, Some(1), "desc1", "details1", "plan",
       new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty))
 
     time += 1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
index f1b77e502df..f9eea3816fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
@@ -41,8 +41,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
     val acceptFn = filter.acceptFn().lift
 
     // Verifying with finished SQL execution 1
-    assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1",
-      "plan", null, 0, Map.empty)))
+    assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, Some(1),
+      "description1", "details1", "plan", null, 0, Map.empty)))
     assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0)))
     assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null)))
     assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty)))
@@ -88,8 +88,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
     }
 
     // Verifying with live SQL execution 2
-    assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2",
-      "plan", null, 0, Map.empty)))
+    assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, Some(2),
+      "description2", "details2", "plan", null, 0, Map.empty)))
     assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0)))
     assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null)))
     assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index 7af58867f33..d1cd32f3621 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -86,7 +86,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
       val df = createTestDataFrame
       listener.onOtherEvent(SparkListenerSQLExecutionStart(
         0,
-        0,
+        Some(0),
         "test",
         "test",
         df.queryExecution.toString,
@@ -142,7 +142,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       0,
-      0,
+      Some(0),
       "test",
       "test",
       df.queryExecution.toString,
@@ -150,7 +150,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
       System.currentTimeMillis()))
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       1,
-      0,
+      Some(0),
       "test",
       "test",
       df.queryExecution.toString,
@@ -159,7 +159,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
     // sub execution has a missing root execution
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       2,
-      100,
+      Some(100),
       "test",
       "test",
       df.queryExecution.toString,
@@ -171,6 +171,43 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
     assert(html.contains("id=2"))
   }
 
+  test("SPARK-42754: group sub executions - backward compatibility") {
+    val statusStore = createStatusStore
+    val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
+    val request = mock(classOf[HttpServletRequest])
+
+    val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true)
+    when(tab.conf).thenReturn(sparkConf)
+    when(tab.sqlStore).thenReturn(statusStore)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+
+    val listener = statusStore.listener.get
+    val page = new AllExecutionsPage(tab)
+    val df = createTestDataFrame
+    // testing compatibility with old event logs for which rootExecutionId = None
+    // because the field is missing when generated by a Spark version not support
+    // nested execution
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      0,
+      None,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      1,
+      None,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+    assert(!html.contains("sub execution ids") && !html.contains("sub-execution-list"))
+  }
+
   protected def createStatusStore: SQLAppStatusStore
 
   private def createTestDataFrame: DataFrame = {
@@ -196,7 +233,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
       val df = createTestDataFrame
       listener.onOtherEvent(SparkListenerSQLExecutionStart(
         executionId,
-        executionId,
+        Some(executionId),
         "test",
         "test",
         df.queryExecution.toString,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
index 3b9efb18057..252bcea8b8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
@@ -75,7 +75,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
     val executionId = idgen.incrementAndGet()
     val executionStart = SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       getClass().getName(),
       getClass().getName(),
       getClass().getName(),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 81c745029fd..fdc633f3556 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -191,7 +191,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
 
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -382,7 +382,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -413,7 +413,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -455,7 +455,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -486,7 +486,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -518,7 +518,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,
@@ -659,7 +659,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       1,
-      1,
+      Some(1),
       "test",
       "test",
       df.queryExecution.toString,
@@ -669,7 +669,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       2,
-      2,
+      Some(2),
       "test",
       "test",
       df.queryExecution.toString,
@@ -687,7 +687,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       3,
-      3,
+      Some(3),
       "test",
       "test",
       df.queryExecution.toString,
@@ -724,7 +724,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
 
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
-      executionId,
+      Some(executionId),
       "test",
       "test",
       df.queryExecution.toString,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org