You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/07/28 23:36:02 UTC

[spark] branch master updated: [SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc5c6a  [SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec
6bc5c6a is described below

commit 6bc5c6a4e7c36361db437313cd950509a1ab6db2
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Mon Jul 29 08:35:25 2019 +0900

    [SPARK-28520][SQL] WholeStageCodegen does not work property for LocalTableScanExec
    
    Code is not generated for LocalTableScanExec although proper situations.
    
    If a LocalTableScanExec plan has the direct parent plan which supports WholeStageCodegen,
    the LocalTableScanExec plan also should be within a WholeStageCodegen domain.
    But code is not generated for LocalTableScanExec and InputAdapter is inserted for now.
    
    ```
    val df1 = spark.createDataset(1 to 10).toDF
    val df2 = spark.createDataset(1 to 10).toDF
    val df3 = df1.join(df2, df1("value") === df2("value"))
    df3.explain(true)
    
    ...
    
    == Physical Plan ==
    *(1) BroadcastHashJoin [value#1], [value#6], Inner, BuildRight
    :- LocalTableScan [value#1]                                             // LocalTableScanExec is not within a WholeStageCodegen domain
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
       +- LocalTableScan [value#6]
    ```
    
    ```
    scala> df3.queryExecution.executedPlan.children.head.children.head.getClass
    res4: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.InputAdapter
    ```
    
    For the current implementation of LocalTableScanExec, codegen is enabled in case `parent` is not null
    but `parent` is set in `consume`, which is called after `insertInputAdapter` so it doesn't work as intended.
    
    After applying this cnahge, we can get following plan, which means LocalTableScanExec is within a WholeStageCodegen domain.
    
    ```
    == Physical Plan ==
    *(1) BroadcastHashJoin [value#63], [value#68], Inner, BuildRight
    :- *(1) LocalTableScan [value#63]
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
       +- LocalTableScan [value#68]
    
    ## How was this patch tested?
    
    New test cases are added into WholeStageCodegenSuite.
    
    Closes #25260 from sarutak/localtablescan-improvement.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../spark/sql/execution/LocalTableScanExec.scala   |  3 ---
 .../sql/execution/WholeStageCodegenExec.scala      |  4 +++
 .../sql/execution/WholeStageCodegenSuite.scala     | 30 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 31640db..9e32ecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -80,8 +80,5 @@ case class LocalTableScanExec(
   // Input is already UnsafeRows.
   override protected val createUnsafeProjection: Boolean = false
 
-  // Do not codegen when there is no parent - to support the fast driver-local collect/take paths.
-  override def supportCodegen: Boolean = (parent != null)
-
   override def inputRDD: RDD[InternalRow] = rdd
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index a0afa9a..d9d9b1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -895,6 +895,10 @@ case class CollapseCodegenStages(
       // domain object can not be written into unsafe row.
       case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
         plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar)))
+      case plan: LocalTableScanExec =>
+        // Do not make LogicalTableScanExec the root of WholeStageCodegen
+        // to support the fast driver-local collect/take paths.
+        plan
       case plan: CodegenSupport if supportCodegen(plan) =>
         WholeStageCodegenExec(
           insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 483a046..59b9e15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.expressions.scalalang.typed
@@ -325,4 +326,33 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
       checkAnswer(groupByWithId, Seq(Row(1, 2, 0), Row(1, 2, 0)))
     }
   }
+
+  test("SPARK-28520: WholeStageCodegen does not work properly for LocalTableScanExec") {
+    // Case1: LocalTableScanExec is the root of a query plan tree.
+    // In this case, WholeStageCodegenExec should not be inserted
+    // as the direct parent of LocalTableScanExec.
+    val df = Seq(1, 2, 3).toDF
+    val rootOfExecutedPlan = df.queryExecution.executedPlan
+
+    // Ensure WholeStageCodegenExec is not inserted and
+    // LocalTableScanExec is still the root.
+    assert(rootOfExecutedPlan.isInstanceOf[LocalTableScanExec],
+      "LocalTableScanExec should be still the root.")
+
+    // Case2: The parent of a LocalTableScanExec supports WholeStageCodegen.
+    // In this case, the LocalTableScanExec should be within a WholeStageCodegen domain
+    // and no more InputAdapter is inserted as the direct parent of the LocalTableScanExec.
+    val aggedDF = Seq(1, 2, 3).toDF.groupBy("value").sum()
+    val executedPlan = aggedDF.queryExecution.executedPlan
+
+    // HashAggregateExec supports WholeStageCodegen and it's the parent of
+    // LocalTableScanExec so LocalTableScanExec should be within a WholeStageCodegen domain.
+    assert(
+      executedPlan.find {
+        case WholeStageCodegenExec(
+          HashAggregateExec(_, _, _, _, _, _, _: LocalTableScanExec)) => true
+        case _ => false
+      }.isDefined,
+      "LocalTableScanExec should be within a WholeStageCodegen domain.")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org