You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/26 07:02:04 UTC

[spark] branch branch-3.0 updated: [SPARK-32659][SQL] Fix the data issue when pruning DPP on non-atomic type

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9e8fb48  [SPARK-32659][SQL] Fix the data issue when pruning DPP on non-atomic type
9e8fb48 is described below

commit 9e8fb4874233ba9b933d2a99e73e683c2fbde594
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Aug 26 06:57:43 2020 +0000

    [SPARK-32659][SQL] Fix the data issue when pruning DPP on non-atomic type
    
    ### What changes were proposed in this pull request?
    
    Use `InSet` expression to fix data issue when pruning DPP on non-atomic type. for example:
       ```scala
        spark.range(1000)
        .select(col("id"), col("id").as("k"))
        .write
        .partitionBy("k")
        .format("parquet")
        .mode("overwrite")
        .saveAsTable("df1");
    
       spark.range(100)
       .select(col("id"), col("id").as("k"))
       .write
       .partitionBy("k")
       .format("parquet")
       .mode("overwrite")
       .saveAsTable("df2")
    
       spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio=2")
       spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=false")
       spark.sql("SELECT df1.id, df2.k FROM df1 JOIN df2 ON struct(df1.k) = struct(df2.k) AND df2.id < 2").show
       ```
       It should return two records, but it returns empty.
    
    ### Why are the changes needed?
    
    Fix data issue
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add new unit test.
    
    Closes #29475 from wangyum/SPARK-32659.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit a8b568800e64f6a163da28e5e53441f84355df14)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/execution/subquery.scala  | 21 +++------
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 51 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index c2270c5..9d15c76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -114,9 +114,10 @@ case class InSubqueryExec(
     child: Expression,
     plan: BaseSubqueryExec,
     exprId: ExprId,
-    private var resultBroadcast: Broadcast[Array[Any]] = null) extends ExecSubqueryExpression {
+    private var resultBroadcast: Broadcast[Set[Any]] = null) extends ExecSubqueryExpression {
 
-  @transient private var result: Array[Any] = _
+  @transient private var result: Set[Any] = _
+  @transient private lazy val inSet = InSet(child, result)
 
   override def dataType: DataType = BooleanType
   override def children: Seq[Expression] = child :: Nil
@@ -131,14 +132,11 @@ case class InSubqueryExec(
 
   def updateResult(): Unit = {
     val rows = plan.executeCollect()
-    result = child.dataType match {
-      case _: StructType => rows.toArray
-      case _ => rows.map(_.get(0, child.dataType))
-    }
+    result = rows.map(_.get(0, child.dataType)).toSet
     resultBroadcast = plan.sqlContext.sparkContext.broadcast(result)
   }
 
-  def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value)
+  def values(): Option[Set[Any]] = Option(resultBroadcast).map(_.value)
 
   private def prepareResult(): Unit = {
     require(resultBroadcast != null, s"$this has not finished")
@@ -149,17 +147,12 @@ case class InSubqueryExec(
 
   override def eval(input: InternalRow): Any = {
     prepareResult()
-    val v = child.eval(input)
-    if (v == null) {
-      null
-    } else {
-      result.contains(v)
-    }
+    inSet.eval(input)
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     prepareResult()
-    InSet(child, result.toSet).doGenCode(ctx, ev)
+    inSet.doGenCode(ctx, ev)
   }
 
   override lazy val canonicalized: InSubqueryExec = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 45f4309..bf91bdb7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.scalatest.GivenWhenThen
 
-import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
+import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, DynamicPruningExpression, Expression}
 import org.apache.spark.sql.catalyst.plans.ExistenceJoin
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
@@ -1309,6 +1309,55 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") {
+    withSQLConf(
+      SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "2", // Make sure insert DPP
+      SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false") {
+      withTable("df1", "df2") {
+        spark.range(1000)
+          .select(col("id"), col("id").as("k"))
+          .write
+          .partitionBy("k")
+          .format(tableFormat)
+          .mode("overwrite")
+          .saveAsTable("df1")
+
+        spark.range(100)
+          .select(col("id"), col("id").as("k"))
+          .write
+          .partitionBy("k")
+          .format(tableFormat)
+          .mode("overwrite")
+          .saveAsTable("df2")
+
+        Seq(CodegenObjectFactoryMode.NO_CODEGEN,
+          CodegenObjectFactoryMode.CODEGEN_ONLY).foreach { mode =>
+          Seq(true, false).foreach { pruning =>
+            withSQLConf(
+              SQLConf.CODEGEN_FACTORY_MODE.key -> mode.toString,
+              SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> s"$pruning") {
+              val df = sql(
+                """
+                  |SELECT df1.id, df2.k
+                  |FROM df1
+                  |  JOIN df2
+                  |  ON struct(df1.k) = struct(df2.k)
+                  |    AND df2.id < 2
+                  |""".stripMargin)
+              if (pruning) {
+                checkPartitionPruningPredicate(df, true, false)
+              } else {
+                checkPartitionPruningPredicate(df, false, false)
+              }
+
+              checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
+            }
+          }
+        }
+      }
+    }
+  }
 }
 
 class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org