You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/08/26 07:02:04 UTC
[spark] branch branch-3.0 updated: [SPARK-32659][SQL] Fix the data
issue when pruning DPP on non-atomic type
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9e8fb48 [SPARK-32659][SQL] Fix the data issue when pruning DPP on non-atomic type
9e8fb48 is described below
commit 9e8fb4874233ba9b933d2a99e73e683c2fbde594
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Wed Aug 26 06:57:43 2020 +0000
[SPARK-32659][SQL] Fix the data issue when pruning DPP on non-atomic type
### What changes were proposed in this pull request?
Use `InSet` expression to fix data issue when pruning DPP on non-atomic type. for example:
```scala
spark.range(1000)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("df1");
spark.range(100)
.select(col("id"), col("id").as("k"))
.write
.partitionBy("k")
.format("parquet")
.mode("overwrite")
.saveAsTable("df2")
spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.fallbackFilterRatio=2")
spark.sql("set spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=false")
spark.sql("SELECT df1.id, df2.k FROM df1 JOIN df2 ON struct(df1.k) = struct(df2.k) AND df2.id < 2").show
```
It should return two records, but it returns empty.
### Why are the changes needed?
Fix data issue
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Add new unit test.
Closes #29475 from wangyum/SPARK-32659.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit a8b568800e64f6a163da28e5e53441f84355df14)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/execution/subquery.scala | 21 +++------
.../spark/sql/DynamicPartitionPruningSuite.scala | 51 +++++++++++++++++++++-
2 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index c2270c5..9d15c76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -114,9 +114,10 @@ case class InSubqueryExec(
child: Expression,
plan: BaseSubqueryExec,
exprId: ExprId,
- private var resultBroadcast: Broadcast[Array[Any]] = null) extends ExecSubqueryExpression {
+ private var resultBroadcast: Broadcast[Set[Any]] = null) extends ExecSubqueryExpression {
- @transient private var result: Array[Any] = _
+ @transient private var result: Set[Any] = _
+ @transient private lazy val inSet = InSet(child, result)
override def dataType: DataType = BooleanType
override def children: Seq[Expression] = child :: Nil
@@ -131,14 +132,11 @@ case class InSubqueryExec(
def updateResult(): Unit = {
val rows = plan.executeCollect()
- result = child.dataType match {
- case _: StructType => rows.toArray
- case _ => rows.map(_.get(0, child.dataType))
- }
+ result = rows.map(_.get(0, child.dataType)).toSet
resultBroadcast = plan.sqlContext.sparkContext.broadcast(result)
}
- def values(): Option[Array[Any]] = Option(resultBroadcast).map(_.value)
+ def values(): Option[Set[Any]] = Option(resultBroadcast).map(_.value)
private def prepareResult(): Unit = {
require(resultBroadcast != null, s"$this has not finished")
@@ -149,17 +147,12 @@ case class InSubqueryExec(
override def eval(input: InternalRow): Any = {
prepareResult()
- val v = child.eval(input)
- if (v == null) {
- null
- } else {
- result.contains(v)
- }
+ inSet.eval(input)
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
prepareResult()
- InSet(child, result.toSet).doGenCode(ctx, ev)
+ inSet.doGenCode(ctx, ev)
}
override lazy val canonicalized: InSubqueryExec = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index 45f4309..bf91bdb7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.scalatest.GivenWhenThen
-import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
+import org.apache.spark.sql.catalyst.expressions.{CodegenObjectFactoryMode, DynamicPruningExpression, Expression}
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
@@ -1309,6 +1309,55 @@ abstract class DynamicPartitionPruningSuiteBase
)
}
}
+
+ test("SPARK-32659: Fix the data issue when pruning DPP on non-atomic type") {
+ withSQLConf(
+ SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "2", // Make sure insert DPP
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false") {
+ withTable("df1", "df2") {
+ spark.range(1000)
+ .select(col("id"), col("id").as("k"))
+ .write
+ .partitionBy("k")
+ .format(tableFormat)
+ .mode("overwrite")
+ .saveAsTable("df1")
+
+ spark.range(100)
+ .select(col("id"), col("id").as("k"))
+ .write
+ .partitionBy("k")
+ .format(tableFormat)
+ .mode("overwrite")
+ .saveAsTable("df2")
+
+ Seq(CodegenObjectFactoryMode.NO_CODEGEN,
+ CodegenObjectFactoryMode.CODEGEN_ONLY).foreach { mode =>
+ Seq(true, false).foreach { pruning =>
+ withSQLConf(
+ SQLConf.CODEGEN_FACTORY_MODE.key -> mode.toString,
+ SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> s"$pruning") {
+ val df = sql(
+ """
+ |SELECT df1.id, df2.k
+ |FROM df1
+ | JOIN df2
+ | ON struct(df1.k) = struct(df2.k)
+ | AND df2.id < 2
+ |""".stripMargin)
+ if (pruning) {
+ checkPartitionPruningPredicate(df, true, false)
+ } else {
+ checkPartitionPruningPredicate(df, false, false)
+ }
+
+ checkAnswer(df, Row(0, 0) :: Row(1, 1) :: Nil)
+ }
+ }
+ }
+ }
+ }
+ }
}
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org