You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/27 13:40:37 UTC
[spark] branch branch-3.0 updated: [SPARK-31529][SQL][3.0] Remove
extra whitespaces in formatted explain
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 3b30066 [SPARK-31529][SQL][3.0] Remove extra whitespaces in formatted explain
3b30066 is described below
commit 3b30066e7a2cb27e59e53419226e64f6a90201d6
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon Apr 27 13:39:36 2020 +0000
[SPARK-31529][SQL][3.0] Remove extra whitespaces in formatted explain
### What changes were proposed in this pull request?
Remove all the extra whitespaces in the formatted explain.
### Why are the changes needed?
The number of extra whitespaces of the formatted explain becomes different between master and branch-3.0. This causes a problem that whenever we backport formatted explain related tests from master to branch-3.0, it will fail branch-3.0. Besides, extra whitespaces are always disallowed in Spark. Thus, we should remove them as possible as we can.
### Does this PR introduce any user-facing change?
No, formatted explain is newly added in Spark 3.0.
### How was this patch tested?
Updated sql query tests.
Closes #28364 from Ngone51/bp-spark31529.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/plans/QueryPlan.scala | 19 +-
.../spark/sql/execution/DataSourceScanExec.scala | 8 +-
.../apache/spark/sql/execution/ExplainUtils.scala | 10 +-
.../org/apache/spark/sql/execution/SparkPlan.scala | 27 +-
.../execution/aggregate/BaseAggregateExec.scala | 4 +-
.../sql/execution/basicPhysicalOperators.scala | 8 +-
.../spark/sql/execution/exchange/Exchange.scala | 5 +-
.../sql/execution/joins/CartesianProductExec.scala | 2 +-
.../spark/sql/execution/joins/HashJoin.scala | 4 +-
.../sql/execution/joins/SortMergeJoinExec.scala | 4 +-
.../sql-tests/results/explain-aqe.sql.out | 372 ++++++++++-----------
.../resources/sql-tests/results/explain.sql.out | 340 +++++++++----------
.../scala/org/apache/spark/sql/ExplainSuite.scala | 10 +-
13 files changed, 401 insertions(+), 412 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 13e5b12..7133fb2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -195,24 +195,27 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
}
def verboseStringWithOperatorId(): String = {
- val codegenIdStr =
- getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s"[codegen id : $id]").getOrElse("")
- val operatorId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown")
- val baseStr = s"($operatorId) $nodeName $codegenIdStr"
val argumentString = argString(SQLConf.get.maxToStringFields)
if (argumentString.nonEmpty) {
s"""
- |$baseStr
+ |$formattedNodeName
|Arguments: $argumentString
- """.stripMargin
+ |""".stripMargin
} else {
s"""
- |$baseStr
- """.stripMargin
+ |$formattedNodeName
+ |""".stripMargin
}
}
+ protected def formattedNodeName: String = {
+ val opId = getTagValue(QueryPlan.OP_ID_TAG).map(id => s"$id").getOrElse("unknown")
+ val codegenId =
+ getTagValue(QueryPlan.CODEGEN_ID_TAG).map(id => s" [codegen id : $id]").getOrElse("")
+ s"($opId) $nodeName$codegenId"
+ }
+
/**
* All the top-level subqueries of the current plan node. Nested subqueries are not included.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index bd0e1d0..0be76ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -75,10 +75,10 @@ trait DataSourceScanExec extends LeafExecNode {
}
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metadataStr.mkString("\n")}
- """.stripMargin
+ |""".stripMargin
}
/**
@@ -377,10 +377,10 @@ case class FileSourceScanExec(
}
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Output", output)}
|${metadataStr.mkString("\n")}
- """.stripMargin
+ |""".stripMargin
}
lazy val inputRDD: RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
index aec1c93..b54bd6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
@@ -233,20 +233,12 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Returns the operator identifier for the supplied plan by retrieving the
- * `operationId` tag value.`
+ * `operationId` tag value.
*/
def getOpId(plan: QueryPlan[_]): String = {
plan.getTagValue(QueryPlan.OP_ID_TAG).map(v => s"$v").getOrElse("unknown")
}
- /**
- * Returns the operator identifier for the supplied plan by retrieving the
- * `codegenId` tag value.`
- */
- def getCodegenId(plan: QueryPlan[_]): String = {
- plan.getTagValue(QueryPlan.CODEGEN_ID_TAG).map(v => s"[codegen id : $v]").getOrElse("")
- }
-
def removeTags(plan: QueryPlan[_]): Unit = {
def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
p.unsetTagValue(QueryPlan.OP_ID_TAG)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index e1a6495..ead8c00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -514,20 +514,19 @@ trait LeafExecNode extends SparkPlan {
override def producedAttributes: AttributeSet = outputSet
override def verboseStringWithOperatorId(): String = {
val argumentString = argString(SQLConf.get.maxToStringFields)
- val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}"
val outputStr = s"${ExplainUtils.generateFieldString("Output", output)}"
if (argumentString.nonEmpty) {
s"""
- |$baseStr
+ |$formattedNodeName
|$outputStr
|Arguments: $argumentString
- """.stripMargin
+ |""".stripMargin
} else {
s"""
- |$baseStr
+ |$formattedNodeName
|$outputStr
- """.stripMargin
+ |""".stripMargin
}
}
}
@@ -545,20 +544,19 @@ trait UnaryExecNode extends SparkPlan {
override final def children: Seq[SparkPlan] = child :: Nil
override def verboseStringWithOperatorId(): String = {
val argumentString = argString(SQLConf.get.maxToStringFields)
- val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}"
val inputStr = s"${ExplainUtils.generateFieldString("Input", child.output)}"
if (argumentString.nonEmpty) {
s"""
- |$baseStr
+ |$formattedNodeName
|$inputStr
|Arguments: $argumentString
- """.stripMargin
+ |""".stripMargin
} else {
s"""
- |$baseStr
+ |$formattedNodeName
|$inputStr
- """.stripMargin
+ |""".stripMargin
}
}
}
@@ -570,23 +568,22 @@ trait BinaryExecNode extends SparkPlan {
override final def children: Seq[SparkPlan] = Seq(left, right)
override def verboseStringWithOperatorId(): String = {
val argumentString = argString(SQLConf.get.maxToStringFields)
- val baseStr = s"(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}"
val leftOutputStr = s"${ExplainUtils.generateFieldString("Left output", left.output)}"
val rightOutputStr = s"${ExplainUtils.generateFieldString("Right output", right.output)}"
if (argumentString.nonEmpty) {
s"""
- |$baseStr
+ |$formattedNodeName
|$leftOutputStr
|$rightOutputStr
|Arguments: $argumentString
- """.stripMargin
+ |""".stripMargin
} else {
s"""
- |$baseStr
+ |$formattedNodeName
|$leftOutputStr
|$rightOutputStr
- """.stripMargin
+ |""".stripMargin
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
index 19d7263..f506bdd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala
@@ -32,12 +32,12 @@ trait BaseAggregateExec extends UnaryExecNode {
override def verboseStringWithOperatorId(): String = {
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Input", child.output)}
|${ExplainUtils.generateFieldString("Keys", groupingExpressions)}
|${ExplainUtils.generateFieldString("Functions", aggregateExpressions)}
|${ExplainUtils.generateFieldString("Aggregate Attributes", aggregateAttributes)}
|${ExplainUtils.generateFieldString("Results", resultExpressions)}
- """.stripMargin
+ |""".stripMargin
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index c3e259d..036b029 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -86,10 +86,10 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
override def verboseStringWithOperatorId(): String = {
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Output", projectList)}
|${ExplainUtils.generateFieldString("Input", child.output)}
- """.stripMargin
+ |""".stripMargin
}
}
@@ -243,10 +243,10 @@ case class FilterExec(condition: Expression, child: SparkPlan)
override def verboseStringWithOperatorId(): String = {
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Input", child.output)}
|Condition : ${condition}
- """.stripMargin
+ |""".stripMargin
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index dda9a63..c406287 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -88,12 +88,11 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
}
override def verboseStringWithOperatorId(): String = {
- val cdgen = ExplainUtils.getCodegenId(this)
val reuse_op_str = ExplainUtils.getOpId(child)
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${cdgen} [Reuses operator id: $reuse_op_str]
+ |$formattedNodeName [Reuses operator id: $reuse_op_str]
|${ExplainUtils.generateFieldString("Output", output)}
- """.stripMargin
+ |""".stripMargin
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index 7e2f487..91016a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -72,7 +72,7 @@ case class CartesianProductExec(
} else "None"
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 99cf602..305741e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -48,11 +48,11 @@ trait HashJoin {
} else "None"
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
- """.stripMargin
+ |""".stripMargin
}
override def output: Seq[Attribute] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 7a08dd1..5b5904f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -62,11 +62,11 @@ case class SortMergeJoinExec(
s"${condition.get}"
} else "None"
s"""
- |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
+ |$formattedNodeName
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
- """.stripMargin
+ |""".stripMargin
}
override def output: Seq[Attribute] = {
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index f00dd81..3675786 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -64,48 +64,48 @@ AdaptiveSparkPlan (9)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) HashAggregate
+
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(5) Exchange
+
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(6) HashAggregate
+
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(7) Exchange
+
+(7) Exchange
Input [2]: [key#x, max(val)#x]
Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x]
-
-(8) Sort
+
+(8) Sort
Input [2]: [key#x, max(val)#x]
Arguments: [key#x ASC NULLS FIRST], true, 0
-
-(9) AdaptiveSparkPlan
+
+(9) AdaptiveSparkPlan
Output [2]: [key#x, max(val)#x]
Arguments: isFinalPlan=false
@@ -132,48 +132,48 @@ AdaptiveSparkPlan (9)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) HashAggregate
+
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(5) Exchange
+
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(6) HashAggregate
+
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x]
-
-(7) Filter
+
+(7) Filter
Input [3]: [key#x, max(val)#x, max(val#x)#x]
Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0))
-
-(8) Project
+
+(8) Project
Output [2]: [key#x, max(val)#x]
Input [3]: [key#x, max(val)#x, max(val#x)#x]
-
-(9) AdaptiveSparkPlan
+
+(9) AdaptiveSparkPlan
Output [2]: [key#x, max(val)#x]
Arguments: isFinalPlan=false
@@ -200,57 +200,57 @@ AdaptiveSparkPlan (11)
+- Scan parquet default.explain_temp1 (4)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) Scan parquet default.explain_temp1
+
+(4) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
-(5) Filter
+
+(5) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
-(6) Project
+
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(7) Union
-
-(8) HashAggregate
+
+(7) Union
+
+(8) HashAggregate
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
Functions: []
Aggregate Attributes: []
Results [2]: [key#x, val#x]
-
-(9) Exchange
+
+(9) Exchange
Input [2]: [key#x, val#x]
Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x]
-
-(10) HashAggregate
+
+(10) HashAggregate
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
Functions: []
Aggregate Attributes: []
Results [2]: [key#x, val#x]
-
-(11) AdaptiveSparkPlan
+
+(11) AdaptiveSparkPlan
Output [2]: [key#x, val#x]
Arguments: isFinalPlan=false
@@ -276,46 +276,46 @@ AdaptiveSparkPlan (9)
+- Scan parquet default.explain_temp2 (4)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) Scan parquet default.explain_temp2
+
+(4) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
-(5) Filter
+
+(5) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
-(6) Project
+
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(7) BroadcastExchange
+
+(7) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
-(8) BroadcastHashJoin
+
+(8) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
-
-(9) AdaptiveSparkPlan
+
+(9) AdaptiveSparkPlan
Output [4]: [key#x, val#x, key#x, val#x]
Arguments: isFinalPlan=false
@@ -339,37 +339,37 @@ AdaptiveSparkPlan (7)
+- Scan parquet default.explain_temp2 (2)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
-(2) Scan parquet default.explain_temp2
+
+(2) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
-(3) Filter
+
+(3) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
-(4) Project
+
+(4) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(5) BroadcastExchange
+
+(5) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
-(6) BroadcastHashJoin
+
+(6) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
-
-(7) AdaptiveSparkPlan
+
+(7) AdaptiveSparkPlan
Output [4]: [key#x, val#x, key#x, val#x]
Arguments: isFinalPlan=false
@@ -395,22 +395,22 @@ AdaptiveSparkPlan (4)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery subquery#x, [id=#x])) AND (val#x > 3))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) AdaptiveSparkPlan
+
+(4) AdaptiveSparkPlan
Output [2]: [key#x, val#x]
Arguments: isFinalPlan=false
@@ -435,17 +435,17 @@ AdaptiveSparkPlan (3)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery subquery#x, [id=#x]))
-
-(3) AdaptiveSparkPlan
+
+(3) AdaptiveSparkPlan
Output [2]: [key#x, val#x]
Arguments: isFinalPlan=false
@@ -463,17 +463,17 @@ AdaptiveSparkPlan (3)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output: []
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<>
-
-(2) Project
+
+(2) Project
Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x]
Input: []
-
-(3) AdaptiveSparkPlan
+
+(3) AdaptiveSparkPlan
Output [1]: [(scalarsubquery() + scalarsubquery())#x]
Arguments: isFinalPlan=false
@@ -501,46 +501,46 @@ AdaptiveSparkPlan (9)
+- Scan parquet default.explain_temp1 (4)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) Scan parquet default.explain_temp1
+
+(4) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
-(5) Filter
+
+(5) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
-(6) Project
+
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(7) BroadcastExchange
+
+(7) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
-(8) BroadcastHashJoin
+
+(8) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
-
-(9) AdaptiveSparkPlan
+
+(9) AdaptiveSparkPlan
Output [4]: [key#x, val#x, key#x, val#x]
Arguments: isFinalPlan=false
@@ -575,82 +575,82 @@ AdaptiveSparkPlan (15)
+- Scan parquet default.explain_temp1 (7)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
-(2) Filter
+
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
-(3) Project
+
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(4) HashAggregate
+
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(5) Exchange
+
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(6) HashAggregate
+
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(7) Scan parquet default.explain_temp1
+
+(7) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
-(8) Filter
+
+(8) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
-(9) Project
+
+(9) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(10) HashAggregate
+
+(10) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(11) Exchange
+
+(11) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(12) HashAggregate
+
+(12) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(13) BroadcastExchange
+
+(13) BroadcastExchange
Input [2]: [key#x, max(val)#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
-(14) BroadcastHashJoin
+
+(14) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
-
-(15) AdaptiveSparkPlan
+
+(15) AdaptiveSparkPlan
Output [4]: [key#x, max(val)#x, key#x, max(val)#x]
Arguments: isFinalPlan=false
@@ -669,16 +669,16 @@ Execute CreateViewCommand (1)
+- UnresolvedRelation (3)
-(1) Execute CreateViewCommand
+(1) Execute CreateViewCommand
Output: []
-
-(2) CreateViewCommand
+
+(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
-
-(3) UnresolvedRelation
+
+(3) UnresolvedRelation
Arguments: [explain_temp1]
-
-(4) Project
+
+(4) Project
Arguments: ['key, 'val]
@@ -699,31 +699,31 @@ AdaptiveSparkPlan (5)
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
-(2) HashAggregate
+
+(2) HashAggregate
Input [2]: [key#x, val#x]
Keys: []
Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))]
Aggregate Attributes [3]: [count#xL, sum#xL, count#xL]
Results [3]: [count#xL, sum#xL, count#xL]
-
-(3) Exchange
+
+(3) Exchange
Input [3]: [count#xL, sum#xL, count#xL]
Arguments: SinglePartition, true, [id=#x]
-
-(4) HashAggregate
+
+(4) HashAggregate
Input [3]: [count#xL, sum#xL, count#xL]
Keys: []
Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)]
Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL]
Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL]
-
-(5) AdaptiveSparkPlan
+
+(5) AdaptiveSparkPlan
Output [2]: [TOTAL#xL, count(key) FILTER (WHERE (val > 1))#xL]
Arguments: isFinalPlan=false
@@ -744,31 +744,31 @@ AdaptiveSparkPlan (5)
+- Scan parquet default.explain_temp4 (1)
-(1) Scan parquet default.explain_temp4
+(1) Scan parquet default.explain_temp4
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-
-(2) ObjectHashAggregate
+
+(2) ObjectHashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_collect_set(val#x, 0, 0)]
Aggregate Attributes [1]: [buf#x]
Results [2]: [key#x, buf#x]
-
-(3) Exchange
+
+(3) Exchange
Input [2]: [key#x, buf#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(4) ObjectHashAggregate
+
+(4) ObjectHashAggregate
Input [2]: [key#x, buf#x]
Keys [1]: [key#x]
Functions [1]: [collect_set(val#x, 0, 0)]
Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x]
Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x]
-
-(5) AdaptiveSparkPlan
+
+(5) AdaptiveSparkPlan
Output [2]: [key#x, sort_array(collect_set(val), true)[0]#x]
Arguments: isFinalPlan=false
@@ -791,39 +791,39 @@ AdaptiveSparkPlan (7)
+- Scan parquet default.explain_temp4 (1)
-(1) Scan parquet default.explain_temp4
+(1) Scan parquet default.explain_temp4
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-
-(2) Sort
+
+(2) Sort
Input [2]: [key#x, val#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-
-(3) SortAggregate
+
+(3) SortAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_min(val#x)]
Aggregate Attributes [1]: [min#x]
Results [2]: [key#x, min#x]
-
-(4) Exchange
+
+(4) Exchange
Input [2]: [key#x, min#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(5) Sort
+
+(5) Sort
Input [2]: [key#x, min#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-
-(6) SortAggregate
+
+(6) SortAggregate
Input [2]: [key#x, min#x]
Keys [1]: [key#x]
Functions [1]: [min(val#x)]
Aggregate Attributes [1]: [min(val#x)#x]
Results [2]: [key#x, min(val#x)#x AS min(val)#x]
-
-(7) AdaptiveSparkPlan
+
+(7) AdaptiveSparkPlan
Output [2]: [key#x, min(val)#x]
Arguments: isFinalPlan=false
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 1a18d56..2b07dac 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -64,46 +64,46 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
+
(4) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
+
(5) HashAggregate [codegen id : 1]
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(6) Exchange
+
+(6) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
+
(7) HashAggregate [codegen id : 2]
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(8) Exchange
+
+(8) Exchange
Input [2]: [key#x, max(val)#x]
Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x]
-
+
(9) Sort [codegen id : 3]
Input [2]: [key#x, max(val)#x]
Arguments: [key#x ASC NULLS FIRST], true, 0
@@ -131,46 +131,46 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
+
(4) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
+
(5) HashAggregate [codegen id : 1]
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(6) Exchange
+
+(6) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
+
(7) HashAggregate [codegen id : 2]
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x]
-
+
(8) Filter [codegen id : 2]
Input [3]: [key#x, max(val)#x, max(val#x)#x]
Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0))
-
+
(9) Project [codegen id : 2]
Output [2]: [key#x, max(val)#x]
Input [3]: [key#x, max(val)#x, max(val#x)#x]
@@ -199,55 +199,55 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (5)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
+
(4) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(5) Scan parquet default.explain_temp1
+
+(5) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-
+
(6) ColumnarToRow [codegen id : 2]
Input [2]: [key#x, val#x]
-
+
(7) Filter [codegen id : 2]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-
+
(8) Project [codegen id : 2]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(9) Union
-
+
+(9) Union
+
(10) HashAggregate [codegen id : 3]
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
Functions: []
Aggregate Attributes: []
Results [2]: [key#x, val#x]
-
-(11) Exchange
+
+(11) Exchange
Input [2]: [key#x, val#x]
Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x]
-
+
(12) HashAggregate [codegen id : 4]
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
@@ -278,46 +278,46 @@ struct<plan:string>
+- Scan parquet default.explain_temp2 (5)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 2]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 2]
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
+
(4) Project [codegen id : 2]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(5) Scan parquet default.explain_temp2
+
+(5) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
+
(6) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(7) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
+
(8) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(9) BroadcastExchange
+
+(9) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
+
(10) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [key#x]
Right keys [1]: [key#x]
@@ -344,37 +344,37 @@ struct<plan:string>
+- Scan parquet default.explain_temp2 (3)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 2]
Input [2]: [key#x, val#x]
-
-(3) Scan parquet default.explain_temp2
+
+(3) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-
+
(4) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(5) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-
+
(6) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(7) BroadcastExchange
+
+(7) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
+
(8) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [key#x]
Right keys [1]: [key#x]
@@ -402,24 +402,24 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3))
-
+
(4) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
+
===== Subqueries =====
Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
@@ -432,42 +432,42 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery
+- Scan parquet default.explain_temp2 (5)
-(5) Scan parquet default.explain_temp2
+(5) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)]
ReadSchema: struct<key:int,val:int>
-
+
(6) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(7) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2))
-
+
(8) Project [codegen id : 1]
Output [1]: [key#x]
Input [2]: [key#x, val#x]
-
+
(9) HashAggregate [codegen id : 1]
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]
-
-(10) Exchange
+
+(10) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, true, [id=#x]
-
+
(11) HashAggregate [codegen id : 2]
Input [1]: [max#x]
Keys: []
Functions [1]: [max(key#x)]
Aggregate Attributes [1]: [max(key#x)#x]
Results [1]: [max(key#x)#x AS max(key)#x]
-
+
Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
* HashAggregate (18)
+- Exchange (17)
@@ -478,35 +478,35 @@ Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery
+- Scan parquet default.explain_temp3 (12)
-(12) Scan parquet default.explain_temp3
+(12) Scan parquet default.explain_temp3
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>
-
+
(13) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(14) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))
-
+
(15) Project [codegen id : 1]
Output [1]: [key#x]
Input [2]: [key#x, val#x]
-
+
(16) HashAggregate [codegen id : 1]
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]
-
-(17) Exchange
+
+(17) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, true, [id=#x]
-
+
(18) HashAggregate [codegen id : 2]
Input [1]: [max#x]
Keys: []
@@ -535,19 +535,19 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x]))
-
+
===== Subqueries =====
Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
@@ -560,42 +560,42 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery
+- Scan parquet default.explain_temp2 (4)
-(4) Scan parquet default.explain_temp2
+(4) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>
-
+
(5) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(6) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))
-
+
(7) Project [codegen id : 1]
Output [1]: [key#x]
Input [2]: [key#x, val#x]
-
+
(8) HashAggregate [codegen id : 1]
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_max(key#x)]
Aggregate Attributes [1]: [max#x]
Results [1]: [max#x]
-
-(9) Exchange
+
+(9) Exchange
Input [1]: [max#x]
Arguments: SinglePartition, true, [id=#x]
-
+
(10) HashAggregate [codegen id : 2]
Input [1]: [max#x]
Keys: []
Functions [1]: [max(key#x)]
Aggregate Attributes [1]: [max(key#x)#x]
Results [1]: [max(key#x)#x AS max(key)#x]
-
+
Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
* HashAggregate (17)
+- Exchange (16)
@@ -606,35 +606,35 @@ Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery
+- Scan parquet default.explain_temp3 (11)
-(11) Scan parquet default.explain_temp3
+(11) Scan parquet default.explain_temp3
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp3]
PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
ReadSchema: struct<key:int,val:int>
-
+
(12) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(13) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(val#x) AND (val#x > 0))
-
+
(14) Project [codegen id : 1]
Output [1]: [key#x]
Input [2]: [key#x, val#x]
-
+
(15) HashAggregate [codegen id : 1]
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_avg(cast(key#x as bigint))]
Aggregate Attributes [2]: [sum#x, count#xL]
Results [2]: [sum#x, count#xL]
-
-(16) Exchange
+
+(16) Exchange
Input [2]: [sum#x, count#xL]
Arguments: SinglePartition, true, [id=#x]
-
+
(17) HashAggregate [codegen id : 2]
Input [2]: [sum#x, count#xL]
Keys: []
@@ -656,19 +656,19 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output: []
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<>
-
+
(2) ColumnarToRow [codegen id : 1]
Input: []
-
+
(3) Project [codegen id : 1]
Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x]
Input: []
-
+
===== Subqueries =====
Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
@@ -679,33 +679,33 @@ Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery
+- Scan parquet default.explain_temp1 (4)
-(4) Scan parquet default.explain_temp1
+(4) Scan parquet default.explain_temp1
Output [1]: [key#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int>
-
+
(5) ColumnarToRow [codegen id : 1]
Input [1]: [key#x]
-
+
(6) HashAggregate [codegen id : 1]
Input [1]: [key#x]
Keys: []
Functions [1]: [partial_avg(cast(key#x as bigint))]
Aggregate Attributes [2]: [sum#x, count#xL]
Results [2]: [sum#x, count#xL]
-
-(7) Exchange
+
+(7) Exchange
Input [2]: [sum#x, count#xL]
Arguments: SinglePartition, true, [id=#x]
-
+
(8) HashAggregate [codegen id : 2]
Input [2]: [sum#x, count#xL]
Keys: []
Functions [1]: [avg(cast(key#x as bigint))]
Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
-
+
Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x]
@@ -733,46 +733,46 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (5)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 2]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 2]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
+
(4) Project [codegen id : 2]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(5) Scan parquet default.explain_temp1
+
+(5) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
+
(6) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(7) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
+
(8) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
-(9) BroadcastExchange
+
+(9) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
+
(10) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [key#x]
Right keys [1]: [key#x]
@@ -805,56 +805,56 @@ struct<plan:string>
+- ReusedExchange (8)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Filter [codegen id : 1]
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-
+
(4) Project [codegen id : 1]
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-
+
(5) HashAggregate [codegen id : 1]
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-
-(6) Exchange
+
+(6) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
+
(7) HashAggregate [codegen id : 4]
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(8) ReusedExchange [Reuses operator id: 6]
+
+(8) ReusedExchange [Reuses operator id: 6]
Output [2]: [key#x, max#x]
-
+
(9) HashAggregate [codegen id : 3]
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-
-(10) BroadcastExchange
+
+(10) BroadcastExchange
Input [2]: [key#x, max(val)#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#x]
-
+
(11) BroadcastHashJoin [codegen id : 4]
Left keys [1]: [key#x]
Right keys [1]: [key#x]
@@ -875,16 +875,16 @@ Execute CreateViewCommand (1)
+- UnresolvedRelation (3)
-(1) Execute CreateViewCommand
+(1) Execute CreateViewCommand
Output: []
-
-(2) CreateViewCommand
+
+(2) CreateViewCommand
Arguments: `default`.`explain_view`, SELECT key, val FROM explain_temp1, false, false, PersistedView
-
-(3) UnresolvedRelation
+
+(3) UnresolvedRelation
Arguments: [explain_temp1]
-
-(4) Project
+
+(4) Project
Arguments: ['key, 'val]
@@ -905,26 +905,26 @@ struct<plan:string>
+- Scan parquet default.explain_temp1 (1)
-(1) Scan parquet default.explain_temp1
+(1) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
-(3) HashAggregate
+
+(3) HashAggregate
Input [2]: [key#x, val#x]
Keys: []
Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))]
Aggregate Attributes [3]: [count#xL, sum#xL, count#xL]
Results [3]: [count#xL, sum#xL, count#xL]
-
-(4) Exchange
+
+(4) Exchange
Input [3]: [count#xL, sum#xL, count#xL]
Arguments: SinglePartition, true, [id=#x]
-
+
(5) HashAggregate [codegen id : 2]
Input [3]: [count#xL, sum#xL, count#xL]
Keys: []
@@ -949,27 +949,27 @@ ObjectHashAggregate (5)
+- Scan parquet default.explain_temp4 (1)
-(1) Scan parquet default.explain_temp4
+(1) Scan parquet default.explain_temp4
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
-(3) ObjectHashAggregate
+
+(3) ObjectHashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_collect_set(val#x, 0, 0)]
Aggregate Attributes [1]: [buf#x]
Results [2]: [key#x, buf#x]
-
-(4) Exchange
+
+(4) Exchange
Input [2]: [key#x, buf#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
-(5) ObjectHashAggregate
+
+(5) ObjectHashAggregate
Input [2]: [key#x, buf#x]
Keys [1]: [key#x]
Functions [1]: [collect_set(val#x, 0, 0)]
@@ -995,35 +995,35 @@ SortAggregate (7)
+- Scan parquet default.explain_temp4 (1)
-(1) Scan parquet default.explain_temp4
+(1) Scan parquet default.explain_temp4
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-
+
(2) ColumnarToRow [codegen id : 1]
Input [2]: [key#x, val#x]
-
+
(3) Sort [codegen id : 1]
Input [2]: [key#x, val#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-
-(4) SortAggregate
+
+(4) SortAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_min(val#x)]
Aggregate Attributes [1]: [min#x]
Results [2]: [key#x, min#x]
-
-(5) Exchange
+
+(5) Exchange
Input [2]: [key#x, min#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-
+
(6) Sort [codegen id : 2]
Input [2]: [key#x, min#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-
-(7) SortAggregate
+
+(7) SortAggregate
Input [2]: [key#x, min#x]
Keys [1]: [key#x]
Functions [1]: [min(val#x)]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index a1b6d71..d41d624 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -354,8 +354,6 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2"))
// trigger the final plan for AQE
testDf.collect()
- // whitespace
- val ws = " "
// == Physical Plan ==
// AdaptiveSparkPlan (14)
// +- * HashAggregate (13)
@@ -375,22 +373,22 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
testDf,
FormattedMode,
s"""
- |(6) BroadcastQueryStage$ws
+ |(6) BroadcastQueryStage
|Output [2]: [k#x, v2#x]
|Arguments: 0
|""".stripMargin,
s"""
- |(11) ShuffleQueryStage$ws
+ |(11) ShuffleQueryStage
|Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: 1
|""".stripMargin,
s"""
- |(12) CustomShuffleReader$ws
+ |(12) CustomShuffleReader
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: coalesced
|""".stripMargin,
s"""
- |(14) AdaptiveSparkPlan$ws
+ |(14) AdaptiveSparkPlan
|Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
|Arguments: isFinalPlan=true
|""".stripMargin
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org