You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yu...@apache.org on 2022/03/26 05:49:07 UTC

[spark] branch branch-3.0 updated: [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal

This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4fc718fb [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal
4fc718fb is described below

commit 4fc718fbc7e70784c250ea9315ccfc56cfaa5893
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Sat Mar 26 12:48:08 2022 +0800

    [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal
    
    This is a backport of #35878  to branch 3.0.
    
    The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.
    
    For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
    ```tex
    *(4) Project [store_id#5281, date_id#5283, state_province#5292]
    +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
       :- Union
       :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
       :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
       :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
       :  :     +- *(1) ColumnarToRow
       :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
       :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
       :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
       :  :                    +- *(1) Project [store_id#5291, state_province#5292]
       :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
       :  :                          +- *(1) ColumnarToRow
       :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4), [...]
       :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
       :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
       :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
       :        +- *(2) ColumnarToRow
       :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
       :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
       +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
    ```
    after this pr:
    ```tex
    *(4) Project [store_id#5281, date_id#5283, state_province#5292]
    +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
       :- Union
       :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
       :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
       :  :     +- *(1) ColumnarToRow
       :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
       :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
       :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
       :        +- *(2) ColumnarToRow
       :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
          +- *(3) Project [store_id#5291, state_province#5292]
             +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
                +- *(3) ColumnarToRow
                   +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_i [...]
    ```
    
    Execution performance improvement
    
    No
    
    Added unit test
    
    Closes #35967 from mcdull-zhang/spark_38570_3.2.
    
    Authored-by: mcdull-zhang <wo...@163.com>
    Signed-off-by: Yuming Wang <yu...@ebay.com>
    (cherry picked from commit 8621914e2052eeab25e6ac4e7d5f48b5570c71f7)
    Signed-off-by: Yuming Wang <yu...@ebay.com>
---
 .../sql/catalyst/expressions/predicates.scala      |  1 +
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 2cff5eb..c9df8e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -113,6 +113,7 @@ trait PredicateHelper {
   def findExpressionAndTrackLineageDown(
       exp: Expression,
       plan: LogicalPlan): Option[(Expression, LogicalPlan)] = {
+    if (exp.references.isEmpty) return None
 
     plan match {
       case Project(projectList, child) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index d67deca..18d3765 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1332,6 +1332,34 @@ abstract class DynamicPartitionPruningSuiteBase
       }
     }
   }
+
+  test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") {
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
+      val df = sql(
+        """
+          |SELECT f.store_id,
+          |       f.date_id,
+          |       s.state_province
+          |FROM (SELECT 4 AS store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_sk
+          |      WHERE  date_id >= 1300
+          |      UNION ALL
+          |      SELECT 5 AS store_id,
+          |               date_id,
+          |               product_id
+          |      FROM   fact_stats
+          |      WHERE  date_id <= 1000) f
+          |JOIN dim_store s
+          |ON f.store_id = s.store_id
+          |WHERE s.country = 'US'
+          |""".stripMargin)
+
+      checkPartitionPruningPredicate(df, withSubquery = false, withBroadcast = false)
+      checkAnswer(df, Row(4, 1300, "California") :: Row(5, 1000, "Texas") :: Nil)
+    }
+  }
 }
 
 class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org