You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/03/26 05:31:03 UTC
[spark] branch branch-3.1 updated: [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal
This is an automated email from the ASF dual-hosted git repository.
yumwang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 789ec13 [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal
789ec13 is described below
commit 789ec137c1e240de58152a06746a7defa001343c
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Sat Mar 26 12:48:08 2022 +0800
[SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal
This is a backport of #35878 to branch 3.1.
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.
For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
: : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
: : +- *(1) Project [store_id#5291, state_province#5292]
: : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4), [...]
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
: : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
+- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
:- Union
: :- *(1) Project [4 AS store_id#5281, date_id#5283]
: : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
: +- *(2) Project [5 AS store_id#5282, date_id#5287]
: +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
: +- *(2) ColumnarToRow
: +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
+- *(3) Project [store_id#5291, state_province#5292]
+- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
+- *(3) ColumnarToRow
+- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_i [...]
```
Execution performance improvement
No
Added unit test
Closes #35967 from mcdull-zhang/spark_38570_3.2.
Authored-by: mcdull-zhang <wo...@163.com>
Signed-off-by: Yuming Wang <yu...@ebay.com>
(cherry picked from commit 8621914e2052eeab25e6ac4e7d5f48b5570c71f7)
Signed-off-by: Yuming Wang <yu...@ebay.com>
---
.../sql/catalyst/expressions/predicates.scala | 1 +
.../spark/sql/DynamicPartitionPruningSuite.scala | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index f2d91b5..92d117c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -127,6 +127,7 @@ trait PredicateHelper extends AliasHelper with Logging {
def findExpressionAndTrackLineageDown(
exp: Expression,
plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {
+ if (exp.references.isEmpty) return None
plan match {
case p: Project =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index b57ad61..8c6ec25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1382,6 +1382,34 @@ abstract class DynamicPartitionPruningSuiteBase
)
}
}
+
+ test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+ val df = sql(
+ """
+ |SELECT f.store_id,
+ | f.date_id,
+ | s.state_province
+ |FROM (SELECT 4 AS store_id,
+ | date_id,
+ | product_id
+ | FROM fact_sk
+ | WHERE date_id >= 1300
+ | UNION ALL
+ | SELECT 5 AS store_id,
+ | date_id,
+ | product_id
+ | FROM fact_stats
+ | WHERE date_id <= 1000) f
+ |JOIN dim_store s
+ |ON f.store_id = s.store_id
+ |WHERE s.country = 'US'
+ |""".stripMargin)
+
+ checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = false)
+ checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") :: Nil)
+ }
+ }
}
class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org