You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/21 11:40:13 UTC
[spark] branch branch-3.0 updated: [SPARK-33853][SQL] EXPLAIN
CODEGEN and BenchmarkQueryTest don't show subquery code
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 78dbb4a [SPARK-33853][SQL] EXPLAIN CODEGEN and BenchmarkQueryTest don't show subquery code
78dbb4a is described below
commit 78dbb4a672ddac3d77fa40a4c7fa8c70abf0f26d
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Mon Dec 21 03:29:00 2020 -0800
[SPARK-33853][SQL] EXPLAIN CODEGEN and BenchmarkQueryTest don't show subquery code
### What changes were proposed in this pull request?
This PR fixes an issue that `EXPLAIN CODEGEN` and `BenchmarkQueryTest` don't show the corresponding code for subqueries.
The following example is about `EXPLAIN CODEGEN`.
```
spark.conf.set("spark.sql.adaptive.enabled", "false")
val df = spark.range(1, 100)
df.createTempView("df")
spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN")
scala> spark.sql("SELECT (SELECT min(id) AS v FROM df)").explain("CODEGEN")
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 (maxMethodCodeSize:55; maxConstantPoolSize:97(0.15% used); numInnerClasses:0) ==
*(1) Project [Subquery scalar-subquery#3, [id=#24] AS scalarsubquery()#5L]
: +- Subquery scalar-subquery#3, [id=#24]
: +- *(2) HashAggregate(keys=[], functions=[min(id#0L)], output=[v#2L])
: +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#20]
: +- *(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L])
: +- *(1) Range (1, 100, step=1, splits=12)
+- *(1) Scan OneRowRelation[]
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private scala.collection.Iterator rdd_input_0;
/* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 011 */
/* 012 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 013 */ this.references = references;
/* 014 */ }
/* 015 */
/* 016 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 017 */ partitionIndex = index;
/* 018 */ this.inputs = inputs;
/* 019 */ rdd_input_0 = inputs[0];
/* 020 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 021 */
/* 022 */ }
/* 023 */
/* 024 */ private void project_doConsume_0() throws java.io.IOException {
/* 025 */ // common sub-expressions
/* 026 */
/* 027 */ project_mutableStateArray_0[0].reset();
/* 028 */
/* 029 */ if (false) {
/* 030 */ project_mutableStateArray_0[0].setNullAt(0);
/* 031 */ } else {
/* 032 */ project_mutableStateArray_0[0].write(0, 1L);
/* 033 */ }
/* 034 */ append((project_mutableStateArray_0[0].getRow()));
/* 035 */
/* 036 */ }
/* 037 */
/* 038 */ protected void processNext() throws java.io.IOException {
/* 039 */ while ( rdd_input_0.hasNext()) {
/* 040 */ InternalRow rdd_row_0 = (InternalRow) rdd_input_0.next();
/* 041 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 042 */ project_doConsume_0();
/* 043 */ if (shouldStop()) return;
/* 044 */ }
/* 045 */ }
/* 046 */
/* 047 */ }
```
After this change, the corresponding code for subqueries are shown.
```
Found 3 WholeStageCodegen subtrees.
== Subtree 1 / 3 (maxMethodCodeSize:282; maxConstantPoolSize:206(0.31% used); numInnerClasses:0) ==
*(1) HashAggregate(keys=[], functions=[partial_min(id#0L)], output=[min#8L])
+- *(1) Range (1, 100, step=1, splits=12)
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private boolean agg_bufIsNull_0;
/* 011 */ private long agg_bufValue_0;
/* 012 */ private boolean range_initRange_0;
/* 013 */ private long range_nextIndex_0;
/* 014 */ private TaskContext range_taskContext_0;
/* 015 */ private InputMetrics range_inputMetrics_0;
/* 016 */ private long range_batchEnd_0;
/* 017 */ private long range_numElementsTodo_0;
/* 018 */ private boolean agg_agg_isNull_2_0;
/* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
/* 020 */
/* 021 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 022 */ this.references = references;
/* 023 */ }
/* 024 */
/* 025 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 026 */ partitionIndex = index;
/* 027 */ this.inputs = inputs;
/* 028 */
/* 029 */ range_taskContext_0 = TaskContext.get();
/* 030 */ range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 031 */ range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 032 */ range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 033 */ range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 034 */
/* 035 */ }
/* 036 */
/* 037 */ private void agg_doAggregateWithoutKey_0() throws java.io.IOException {
/* 038 */ // initialize aggregation buffer
/* 039 */ agg_bufIsNull_0 = true;
/* 040 */ agg_bufValue_0 = -1L;
/* 041 */
/* 042 */ // initialize Range
/* 043 */ if (!range_initRange_0) {
/* 044 */ range_initRange_0 = true;
/* 045 */ initRange(partitionIndex);
/* 046 */ }
/* 047 */
/* 048 */ while (true) {
/* 049 */ if (range_nextIndex_0 == range_batchEnd_0) {
/* 050 */ long range_nextBatchTodo_0;
/* 051 */ if (range_numElementsTodo_0 > 1000L) {
/* 052 */ range_nextBatchTodo_0 = 1000L;
/* 053 */ range_numElementsTodo_0 -= 1000L;
/* 054 */ } else {
/* 055 */ range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 056 */ range_numElementsTodo_0 = 0;
/* 057 */ if (range_nextBatchTodo_0 == 0) break;
/* 058 */ }
/* 059 */ range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 060 */ }
/* 061 */
/* 062 */ int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 063 */ for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 064 */ long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 065 */
/* 066 */ agg_doConsume_0(range_value_0);
/* 067 */
/* 068 */ // shouldStop check is eliminated
/* 069 */ }
/* 070 */ range_nextIndex_0 = range_batchEnd_0;
/* 071 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 072 */ range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 073 */ range_taskContext_0.killTaskIfInterrupted();
/* 074 */ }
/* 075 */
/* 076 */ }
/* 077 */
/* 078 */ private void initRange(int idx) {
/* 079 */ java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 080 */ java.math.BigInteger numSlice = java.math.BigInteger.valueOf(12L);
/* 081 */ java.math.BigInteger numElement = java.math.BigInteger.valueOf(99L);
/* 082 */ java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 083 */ java.math.BigInteger start = java.math.BigInteger.valueOf(1L);
/* 084 */ long partitionEnd;
/* 085 */
/* 086 */ java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 087 */ if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 088 */ range_nextIndex_0 = Long.MAX_VALUE;
/* 089 */ } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 090 */ range_nextIndex_0 = Long.MIN_VALUE;
/* 091 */ } else {
/* 092 */ range_nextIndex_0 = st.longValue();
/* 093 */ }
/* 094 */ range_batchEnd_0 = range_nextIndex_0;
/* 095 */
/* 096 */ java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 097 */ .multiply(step).add(start);
/* 098 */ if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 099 */ partitionEnd = Long.MAX_VALUE;
/* 100 */ } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 101 */ partitionEnd = Long.MIN_VALUE;
/* 102 */ } else {
/* 103 */ partitionEnd = end.longValue();
/* 104 */ }
/* 105 */
/* 106 */ java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 107 */ java.math.BigInteger.valueOf(range_nextIndex_0));
/* 108 */ range_numElementsTodo_0 = startToEnd.divide(step).longValue();
/* 109 */ if (range_numElementsTodo_0 < 0) {
/* 110 */ range_numElementsTodo_0 = 0;
/* 111 */ } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 112 */ range_numElementsTodo_0++;
/* 113 */ }
/* 114 */ }
/* 115 */
/* 116 */ private void agg_doConsume_0(long agg_expr_0_0) throws java.io.IOException {
/* 117 */ // do aggregate
/* 118 */ // common sub-expressions
/* 119 */
/* 120 */ // evaluate aggregate functions and update aggregation buffers
/* 121 */
/* 122 */ agg_agg_isNull_2_0 = true;
/* 123 */ long agg_value_2 = -1L;
/* 124 */
/* 125 */ if (!agg_bufIsNull_0 && (agg_agg_isNull_2_0 ||
/* 126 */ agg_value_2 > agg_bufValue_0)) {
/* 127 */ agg_agg_isNull_2_0 = false;
/* 128 */ agg_value_2 = agg_bufValue_0;
/* 129 */ }
/* 130 */
/* 131 */ if (!false && (agg_agg_isNull_2_0 ||
/* 132 */ agg_value_2 > agg_expr_0_0)) {
/* 133 */ agg_agg_isNull_2_0 = false;
/* 134 */ agg_value_2 = agg_expr_0_0;
/* 135 */ }
/* 136 */
/* 137 */ agg_bufIsNull_0 = agg_agg_isNull_2_0;
/* 138 */ agg_bufValue_0 = agg_value_2;
/* 139 */
/* 140 */ }
/* 141 */
/* 142 */ protected void processNext() throws java.io.IOException {
/* 143 */ while (!agg_initAgg_0) {
/* 144 */ agg_initAgg_0 = true;
/* 145 */ long agg_beforeAgg_0 = System.nanoTime();
/* 146 */ agg_doAggregateWithoutKey_0();
/* 147 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* aggTime */).add((System.nanoTime() - agg_beforeAgg_0) / 1000000);
/* 148 */
/* 149 */ // output the result
/* 150 */
/* 151 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 152 */ range_mutableStateArray_0[2].reset();
/* 153 */
/* 154 */ range_mutableStateArray_0[2].zeroOutNullBytes();
/* 155 */
/* 156 */ if (agg_bufIsNull_0) {
/* 157 */ range_mutableStateArray_0[2].setNullAt(0);
/* 158 */ } else {
/* 159 */ range_mutableStateArray_0[2].write(0, agg_bufValue_0);
/* 160 */ }
/* 161 */ append((range_mutableStateArray_0[2].getRow()));
/* 162 */ }
/* 163 */ }
/* 164 */
/* 165 */ }
```
### Why are the changes needed?
For better debuggability.
### Does this PR introduce _any_ user-facing change?
Yes. After this change, users can see subquery code by `EXPLAIN CODEGEN`.
### How was this patch tested?
New test.
Closes #30859 from sarutak/explain-codegen-subqueries.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit f4e1069bb835e3e132f7758e5842af79f26cd162)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/sql/execution/debug/package.scala | 15 ++++++++++-----
.../scala/org/apache/spark/sql/BenchmarkQueryTest.scala | 14 ++++++++++----
.../test/scala/org/apache/spark/sql/ExplainSuite.scala | 16 ++++++++++++++++
3 files changed, 36 insertions(+), 9 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 6c40104..3cbebca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -107,12 +107,17 @@ package object debug {
*/
def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = {
val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
- plan transform {
- case s: WholeStageCodegenExec =>
- codegenSubtrees += s
- s
- case s => s
+
+ def findSubtrees(plan: SparkPlan): Unit = {
+ plan foreach {
+ case s: WholeStageCodegenExec =>
+ codegenSubtrees += s
+ case s =>
+ s.subqueries.foreach(findSubtrees)
+ }
}
+
+ findSubtrees(plan)
codegenSubtrees.toSeq.sortBy(_.codegenStageId).map { subtree =>
val (_, source) = subtree.doCodeGen()
val codeStats = try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
index 07afd41..174e734 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala
@@ -50,11 +50,17 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession {
protected def checkGeneratedCode(plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = {
val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]()
- plan foreach {
- case s: WholeStageCodegenExec =>
- codegenSubtrees += s
- case _ =>
+
+ def findSubtrees(plan: SparkPlan): Unit = {
+ plan foreach {
+ case s: WholeStageCodegenExec =>
+ codegenSubtrees += s
+ case s =>
+ s.subqueries.foreach(findSubtrees)
+ }
}
+
+ findSubtrees(plan)
codegenSubtrees.toSeq.foreach { subtree =>
val code = subtree.doCodeGen()._2
val (_, ByteCodeStats(maxMethodCodeSize, _, _)) = try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index d41d624..158d939 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -228,6 +228,22 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
}
}
+ test("SPARK-33853: explain codegen - check presence of subquery") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") {
+ withTempView("df") {
+ val df1 = spark.range(1, 100)
+ df1.createTempView("df")
+
+ val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df)"
+ val expectedText = "Found 3 WholeStageCodegen subtrees."
+
+ withNormalizedExplain(sqlText) { normalizedOutput =>
+ assert(normalizedOutput.contains(expectedText))
+ }
+ }
+ }
+ }
+
test("explain formatted - check presence of subquery in case of DPP") {
withTable("df1", "df2") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org