You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/03/14 16:07:19 UTC
[spark] branch branch-3.4 updated: [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 24cdae8f3dc [SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution
24cdae8f3dc is described below
commit 24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Tue Mar 14 09:06:53 2023 -0700
[SPARK-42754][SQL][UI] Fix backward compatibility issue in nested SQL execution
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/39268 / [SPARK-41752](https://issues.apache.org/jira/browse/SPARK-41752) added a new non-optional `rootExecutionId: Long` field to the SparkListenerSQLExecutionStart case class.
When JsonProtocol deserializes this event it uses the "ignore missing properties" Jackson deserialization option, causing the rootExecutionField to be initialized with a default value of 0.
The value 0 is a legitimate execution ID, so in the deserialized event we have no ability to distinguish between the absence of a value and a case where all queries have the first query as the root.
Thanks JoshRosen for reporting and investigating this issue.
### Why are the changes needed?
Bug fix
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #40403 from linhongliu-db/fix-nested-execution.
Authored-by: Linhong Liu <li...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 4db8e7b7944302a3929dd6a1197ea1385eecc46a)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/execution/SQLExecution.scala | 2 +-
.../spark/sql/execution/ui/AllExecutionsPage.scala | 2 +-
.../sql/execution/ui/SQLAppStatusListener.scala | 2 +-
.../spark/sql/execution/ui/SQLListener.scala | 5 +-
.../spark/sql/execution/SQLJsonProtocolSuite.scala | 82 ++++++++++++----------
.../history/SQLEventFilterBuilderSuite.scala | 2 +-
.../history/SQLLiveEntitiesEventFilterSuite.scala | 8 +--
.../sql/execution/ui/AllExecutionsPageSuite.scala | 47 +++++++++++--
.../execution/ui/MetricsAggregationBenchmark.scala | 2 +-
.../execution/ui/SQLAppStatusListenerSuite.scala | 20 +++---
10 files changed, 111 insertions(+), 61 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 56fc9d946df..eeca1669e74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -106,7 +106,7 @@ object SQLExecution {
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId = executionId,
- rootExecutionId = rootExecutionId,
+ rootExecutionId = Some(rootExecutionId),
description = desc,
details = callSite.longForm,
physicalPlanDescription = queryExecution.explainString(planDescriptionMode),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index cd8f31b3c21..058ecbbb1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -260,7 +260,7 @@ private[ui] class ExecutionPagedTable(
private val parameterPath =
s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}"
- private val showSubExecutions = subExecutions.nonEmpty
+ private val showSubExecutions = subExecutions.exists(_._2.nonEmpty)
override def tableId: String = s"$executionTag-table"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 32b215b1c2e..7b9f877bdef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -356,7 +356,7 @@ class SQLAppStatusListener(
kvstore.write(graphToStore)
val exec = getOrCreateExecution(executionId)
- exec.rootExecutionId = rootExecutionId
+ exec.rootExecutionId = rootExecutionId.getOrElse(executionId)
exec.description = description
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b931b4fcde1..d4c8f600a4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -44,7 +44,10 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates(
case class SparkListenerSQLExecutionStart(
executionId: Long,
// if the execution is a root, then rootExecutionId == executionId
- rootExecutionId: Long,
+ // if the event is parsed from the event log that generated by Spark not support
+ // nested execution, then rootExecutionId = None
+ @JsonDeserialize(contentAs = classOf[java.lang.Long])
+ rootExecutionId: Option[Long],
description: String,
details: String,
physicalPlanDescription: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
index e9d98ee9715..49cca666d1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
@@ -30,43 +30,53 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
test("SparkPlanGraph backward compatibility: metadata") {
Seq(true, false).foreach { newExecutionStartEvent =>
- val event = if (newExecutionStartEvent) {
- "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart"
- } else {
- "org.apache.spark.sql.execution.OldVersionSQLExecutionStart"
- }
- val SQLExecutionStartJsonString =
- s"""
- |{
- | "Event":"$event",
- | "executionId":0,
- | "description":"test desc",
- | "details":"test detail",
- | "physicalPlanDescription":"test plan",
- | "sparkPlanInfo": {
- | "nodeName":"TestNode",
- | "simpleString":"test string",
- | "children":[],
- | "metadata":{},
- | "metrics":[]
- | },
- | "time":0,
- | "modifiedConfigs": {
- | "k1":"v1"
- | }
- |}
- """.stripMargin
+ Seq(true, false).foreach { newExecutionStartJson =>
+ val event = if (newExecutionStartEvent) {
+ "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart"
+ } else {
+ "org.apache.spark.sql.execution.OldVersionSQLExecutionStart"
+ }
+
+ val SQLExecutionStartJsonString =
+ s"""
+ |{
+ | "Event":"$event",
+ | ${if (newExecutionStartJson) """"rootExecutionId": "1",""" else ""}
+ | "executionId":0,
+ | "description":"test desc",
+ | "details":"test detail",
+ | "physicalPlanDescription":"test plan",
+ | "sparkPlanInfo": {
+ | "nodeName":"TestNode",
+ | "simpleString":"test string",
+ | "children":[],
+ | "metadata":{},
+ | "metrics":[]
+ | },
+ | "time":0,
+ | "modifiedConfigs": {
+ | "k1":"v1"
+ | }
+ |}
+ """.stripMargin
- val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
- if (newExecutionStartEvent) {
- val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", "test detail",
- "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
- Map("k1" -> "v1"))
- assert(reconstructedEvent == expectedEvent)
- } else {
- val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail",
- "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
- assert(reconstructedEvent == expectedOldEvent)
+ val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
+ if (newExecutionStartEvent) {
+ val expectedEvent = if (newExecutionStartJson) {
+ SparkListenerSQLExecutionStart(0, Some(1), "test desc", "test detail",
+ "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
+ Map("k1" -> "v1"))
+ } else {
+ SparkListenerSQLExecutionStart(0, None, "test desc", "test detail",
+ "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
+ Map("k1" -> "v1"))
+ }
+ assert(reconstructedEvent == expectedEvent)
+ } else {
+ val expectedOldEvent = OldVersionSQLExecutionStart(0, "test desc", "test detail",
+ "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0)
+ assert(reconstructedEvent == expectedOldEvent)
+ }
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
index 42b27bd9f28..87ac58dbc3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
@@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite {
}
// Start SQL Execution
- listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", "details1", "plan",
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(1, Some(1), "desc1", "details1", "plan",
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, Map.empty))
time += 1
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
index f1b77e502df..f9eea3816fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
@@ -41,8 +41,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
val acceptFn = filter.acceptFn().lift
// Verifying with finished SQL execution 1
- assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, "description1", "details1",
- "plan", null, 0, Map.empty)))
+ assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, Some(1),
+ "description1", "details1", "plan", null, 0, Map.empty)))
assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0)))
assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, "plan", null)))
assert(Some(false) === acceptFn(SparkListenerDriverAccumUpdates(1, Seq.empty)))
@@ -88,8 +88,8 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
}
// Verifying with live SQL execution 2
- assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, "description2", "details2",
- "plan", null, 0, Map.empty)))
+ assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, Some(2),
+ "description2", "details2", "plan", null, 0, Map.empty)))
assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0)))
assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, "plan", null)))
assert(Some(true) === acceptFn(SparkListenerDriverAccumUpdates(2, Seq.empty)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index 7af58867f33..d1cd32f3621 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -86,7 +86,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
0,
- 0,
+ Some(0),
"test",
"test",
df.queryExecution.toString,
@@ -142,7 +142,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
0,
- 0,
+ Some(0),
"test",
"test",
df.queryExecution.toString,
@@ -150,7 +150,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
System.currentTimeMillis()))
listener.onOtherEvent(SparkListenerSQLExecutionStart(
1,
- 0,
+ Some(0),
"test",
"test",
df.queryExecution.toString,
@@ -159,7 +159,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
// sub execution has a missing root execution
listener.onOtherEvent(SparkListenerSQLExecutionStart(
2,
- 100,
+ Some(100),
"test",
"test",
df.queryExecution.toString,
@@ -171,6 +171,43 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
assert(html.contains("id=2"))
}
+ test("SPARK-42754: group sub executions - backward compatibility") {
+ val statusStore = createStatusStore
+ val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
+ val request = mock(classOf[HttpServletRequest])
+
+ val sparkConf = new SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true)
+ when(tab.conf).thenReturn(sparkConf)
+ when(tab.sqlStore).thenReturn(statusStore)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+
+ val listener = statusStore.listener.get
+ val page = new AllExecutionsPage(tab)
+ val df = createTestDataFrame
+ // testing compatibility with old event logs for which rootExecutionId = None
+ // because the field is missing when generated by a Spark version not support
+ // nested execution
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 0,
+ None,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 1,
+ None,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
+ val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+ assert(!html.contains("sub execution ids") && !html.contains("sub-execution-list"))
+ }
+
protected def createStatusStore: SQLAppStatusStore
private def createTestDataFrame: DataFrame = {
@@ -196,7 +233,7 @@ abstract class AllExecutionsPageSuite extends SharedSparkSession with BeforeAndA
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
index 3b9efb18057..252bcea8b8e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
@@ -75,7 +75,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
val executionId = idgen.incrementAndGet()
val executionStart = SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
getClass().getName(),
getClass().getName(),
getClass().getName(),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 81c745029fd..fdc633f3556 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -191,7 +191,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -382,7 +382,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -413,7 +413,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -455,7 +455,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -486,7 +486,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -518,7 +518,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
@@ -659,7 +659,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
1,
- 1,
+ Some(1),
"test",
"test",
df.queryExecution.toString,
@@ -669,7 +669,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
2,
- 2,
+ Some(2),
"test",
"test",
df.queryExecution.toString,
@@ -687,7 +687,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
3,
- 3,
+ Some(3),
"test",
"test",
df.queryExecution.toString,
@@ -724,7 +724,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
- executionId,
+ Some(executionId),
"test",
"test",
df.queryExecution.toString,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org