You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/07/28 23:36:02 UTC
[spark] branch master updated: [SPARK-28520][SQL] WholeStageCodegen
does not work property for LocalTableScanExec
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc5c6a [SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec
6bc5c6a is described below
commit 6bc5c6a4e7c36361db437313cd950509a1ab6db2
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Mon Jul 29 08:35:25 2019 +0900
[SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec
Code is not generated for LocalTableScanExec although proper situations.
If a LocalTableScanExec plan has the direct parent plan which supports WholeStageCodegen,
the LocalTableScanExec plan also should be within a WholeStageCodegen domain.
But code is not generated for LocalTableScanExec and InputAdapter is inserted for now.
```
val df1 = spark.createDataset(1 to 10).toDF
val df2 = spark.createDataset(1 to 10).toDF
val df3 = df1.join(df2, df1("value") === df2("value"))
df3.explain(true)
...
== Physical Plan ==
*(1) BroadcastHashJoin [value#1], [value#6], Inner, BuildRight
:- LocalTableScan [value#1] // LocalTableScanExec is not within a WholeStageCodegen domain
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [value#6]
```
```
scala> df3.queryExecution.executedPlan.children.head.children.head.getClass
res4: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.InputAdapter
```
For the current implementation of LocalTableScanExec, codegen is enabled in case `parent` is not null
but `parent` is set in `consume`, which is called after `insertInputAdapter` so it doesn't work as intended.
After applying this cnahge, we can get following plan, which means LocalTableScanExec is within a WholeStageCodegen domain.
```
== Physical Plan ==
*(1) BroadcastHashJoin [value#63], [value#68], Inner, BuildRight
:- *(1) LocalTableScan [value#63]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [value#68]
## How was this patch tested?
New test cases are added into WholeStageCodegenSuite.
Closes #25260 from sarutak/localtablescan-improvement.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../spark/sql/execution/LocalTableScanExec.scala | 3 ---
.../sql/execution/WholeStageCodegenExec.scala | 4 +++
.../sql/execution/WholeStageCodegenSuite.scala | 30 ++++++++++++++++++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 31640db..9e32ecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -80,8 +80,5 @@ case class LocalTableScanExec(
// Input is already UnsafeRows.
override protected val createUnsafeProjection: Boolean = false
- // Do not codegen when there is no parent - to support the fast driver-local collect/take paths.
- override def supportCodegen: Boolean = (parent != null)
-
override def inputRDD: RDD[InternalRow] = rdd
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index a0afa9a..d9d9b1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -895,6 +895,10 @@ case class CollapseCodegenStages(
// domain object can not be written into unsafe row.
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar)))
+ case plan: LocalTableScanExec =>
+ // Do not make LogicalTableScanExec the root of WholeStageCodegen
+ // to support the fast driver-local collect/take paths.
+ plan
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(
insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 483a046..59b9e15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.expressions.scalalang.typed
@@ -325,4 +326,33 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
checkAnswer(groupByWithId, Seq(Row(1, 2, 0), Row(1, 2, 0)))
}
}
+
+ test("SPARK-28520: WholeStageCodegen does not work properly for LocalTableScanExec") {
+ // Case1: LocalTableScanExec is the root of a query plan tree.
+ // In this case, WholeStageCodegenExec should not be inserted
+ // as the direct parent of LocalTableScanExec.
+ val df = Seq(1, 2, 3).toDF
+ val rootOfExecutedPlan = df.queryExecution.executedPlan
+
+ // Ensure WholeStageCodegenExec is not inserted and
+ // LocalTableScanExec is still the root.
+ assert(rootOfExecutedPlan.isInstanceOf[LocalTableScanExec],
+ "LocalTableScanExec should be still the root.")
+
+ // Case2: The parent of a LocalTableScanExec supports WholeStageCodegen.
+ // In this case, the LocalTableScanExec should be within a WholeStageCodegen domain
+ // and no more InputAdapter is inserted as the direct parent of the LocalTableScanExec.
+ val aggedDF = Seq(1, 2, 3).toDF.groupBy("value").sum()
+ val executedPlan = aggedDF.queryExecution.executedPlan
+
+ // HashAggregateExec supports WholeStageCodegen and it's the parent of
+ // LocalTableScanExec so LocalTableScanExec should be within a WholeStageCodegen domain.
+ assert(
+ executedPlan.find {
+ case WholeStageCodegenExec(
+ HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
+ case _ => false
+ }.isDefined,
+ "LocalTableScanExec should be within a WholeStageCodegen domain.")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org