You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/06 07:52:55 UTC

[GitHub] [spark] gengliangwang opened a new pull request #28741: [SPARK-31919][SQL] Push down more predicates through Join

gengliangwang opened a new pull request #28741:
URL: https://github.com/apache/spark/pull/28741


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Currently, in the rule`PushPredicateThroughJoin`, if the condition predicate of `Or` operator can't be entirely pushed down, it will be thrown away.
   In fact, the predicates under `Or` operators can be partially pushed down.
   For example, says `a` and `b` are able to be pushed into one of the joined tables, while `c` can't be pushed down, the predicate
   `a or (b and c)`
   can be converted as
   `(a or b) and (a or c)`
   We can still push down `(a or b)`.
   We can't push down disjunctive predicates only when one of its children is not partially convertible.
   
   The common way is to convert the condition into conjunctive normal form(CNF), so that we can find all the predicates that can be pushed down by going over the CNF predicate.
   However, CNF conversion result can be huge, the recursive implementation can cause stack overflow on complex predicates. There were PRs for it such as #10444, #15558, #28575.
   There is also non-recursive implementation: #28733 . It should be workable but this PR proposes a simpler implementation.
    
   Essentially, we just need to traverse predicate and extract the convertible sub-predicates like what we did in https://github.com/apache/spark/pull/24598 . There is no need to maintain the CNF result set.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Improve query performance. PostgreSQL, Impala and Hive support similiar feature.
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Unit test and benchmark test.
   
   
   SQL | Before this PR | After this PR
   --- | --- | ---
   TPCDS 5T Q13 | 84s | 21s
   TPCDS 5T q85 | 66s | 34s
   TPCH 1T q19 | 37s | 32s
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #28741:
URL: https://github.com/apache/spark/pull/28741#discussion_r436267250



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPredicateThroughJoin.scala
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, Not, Or, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+trait PushPredicateThroughJoinBase extends Rule[LogicalPlan] with PredicateHelper {
+  protected def enablePushingExtraPredicates: Boolean

Review comment:
       Why did you split `PushPredicateThroughJoinBase` into the two rules? You couldn't realize this optimization in a single rule?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640053672


   **[Test build #123587 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123587/testReport)** for PR 28741 at commit [`2ca483e`](https://github.com/apache/spark/commit/2ca483e6ccfceb40c579d0094c43832f8896e84f).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640008330


   **[Test build #123587 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123587/testReport)** for PR 28741 at commit [`2ca483e`](https://github.com/apache/spark/commit/2ca483e6ccfceb40c579d0094c43832f8896e84f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640054008






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangyum commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
wangyum commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640136037


   It seems this solution cannot be fully optimized.
   PostgreSQL:
   ```
    Aggregate  (cost=77.33..77.34 rows=1 width=128)
      ->  Nested Loop  (cost=0.00..77.31 rows=1 width=32)
            Join Filter: (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
            ->  Nested Loop  (cost=0.00..67.18 rows=1 width=36)
                  Join Filter: ((store_sales.ss_addr_sk = customer_address.ca_address_sk) AND ((((customer_address.ca_state)::text = ANY ('{TX,OH,TX}'::text[])) AND (store_sales.ss_net_profit >=
   '100'::numeric) AND (store_sales.ss_net_profit <= '200'::numeric)) OR (((customer_address.ca_state)::text = ANY ('{OR,NM,KY}'::text[])) AND (store_sales.ss_net_profit >= '150'::numeric) AND (s
   tore_sales.ss_net_profit <= '300'::numeric)) OR (((customer_address.ca_state)::text = ANY ('{VA,TX,MS}'::text[])) AND (store_sales.ss_net_profit >= '50'::numeric) AND (store_sales.ss_net_profi
   t <= '250'::numeric))))
                  ->  Nested Loop  (cost=0.00..56.90 rows=1 width=54)
                        Join Filter: ((store_sales.ss_cdemo_sk = customer_demographics.cd_demo_sk) AND ((((customer_demographics.cd_marital_status)::text = 'M'::text) AND ((customer_demographics.
   cd_education_status)::text = 'Advanced Degree'::text) AND (store_sales.ss_sales_price >= 100.00) AND (store_sales.ss_sales_price <= 150.00) AND (household_demographics.hd_dep_count = 3)) OR ((
   (customer_demographics.cd_marital_status)::text = 'S'::text) AND ((customer_demographics.cd_education_status)::text = 'College'::text) AND (store_sales.ss_sales_price >= 50.00) AND (store_sale
   s.ss_sales_price <= 100.00) AND (household_demographics.hd_dep_count = 1)) OR (((customer_demographics.cd_marital_status)::text = 'W'::text) AND ((customer_demographics.cd_education_status)::t
   ext = '2 yr Degree'::text) AND (store_sales.ss_sales_price >= 150.00) AND (store_sales.ss_sales_price <= 200.00) AND (household_demographics.hd_dep_count = 1))))
                        ->  Nested Loop  (cost=0.00..46.10 rows=1 width=76)
                              Join Filter: (store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk)
                              ->  Nested Loop  (cost=0.00..33.61 rows=1 width=76)
                                    Join Filter: (store_sales.ss_store_sk = store.s_store_sk)
                                    ->  Seq Scan on store_sales  (cost=0.00..23.60 rows=1 width=80)
                                          Filter: ((((ss_sales_price >= 100.00) AND (ss_sales_price <= 150.00)) OR ((ss_sales_price >= 50.00) AND (ss_sales_price <= 100.00)) OR ((ss_sales_price >
   = 150.00) AND (ss_sales_price <= 200.00))) AND (((ss_net_profit >= '100'::numeric) AND (ss_net_profit <= '200'::numeric)) OR ((ss_net_profit >= '150'::numeric) AND (ss_net_profit <= '300'::num
   eric)) OR ((ss_net_profit >= '50'::numeric) AND (ss_net_profit <= '250'::numeric))))
                                    ->  Seq Scan on store  (cost=0.00..10.00 rows=1 width=4)
                              ->  Seq Scan on household_demographics  (cost=0.00..12.45 rows=3 width=8)
                                    Filter: ((hd_dep_count = 3) OR (hd_dep_count = 1) OR (hd_dep_count = 1))
                        ->  Seq Scan on customer_demographics  (cost=0.00..10.75 rows=1 width=1036)
                              Filter: ((((cd_marital_status)::text = 'M'::text) AND ((cd_education_status)::text = 'Advanced Degree'::text)) OR (((cd_marital_status)::text = 'S'::text) AND ((cd_e
   ducation_status)::text = 'College'::text)) OR (((cd_marital_status)::text = 'W'::text) AND ((cd_education_status)::text = '2 yr Degree'::text)))
                  ->  Seq Scan on customer_address  (cost=0.00..10.24 rows=1 width=520)
                        Filter: (((ca_country)::text = 'United States'::text) AND (((ca_state)::text = ANY ('{TX,OH,TX}'::text[])) OR ((ca_state)::text = ANY ('{OR,NM,KY}'::text[])) OR ((ca_state
   )::text = ANY ('{VA,TX,MS}'::text[]))))
            ->  Seq Scan on date_dim  (cost=0.00..10.12 rows=1 width=4)
                  Filter: (d_year = 2001)
   (22 rows)
   ```
   
   After this PR(`set spark.sql.constraintPropagation.enabled=false` to ignore infer `IsNotNull`):
   ```
   *(7) HashAggregate(keys=[], functions=[avg(cast(ss_quantity#10 as bigint)), avg(UnscaledValue(ss_ext_sales_price#15)), avg(UnscaledValue(ss_ext_wholesale_cost#16)), sum(UnscaledValue(ss_ext_wholesale_cost#16))])
   +- Exchange SinglePartition, true, [id=#252]
      +- *(6) HashAggregate(keys=[], functions=[partial_avg(cast(ss_quantity#10 as bigint)), partial_avg(UnscaledValue(ss_ext_sales_price#15)), partial_avg(UnscaledValue(ss_ext_wholesale_cost#16)), partial_sum(UnscaledValue(ss_ext_wholesale_cost#16))])
         +- *(6) Project [ss_quantity#10, ss_ext_sales_price#15, ss_ext_wholesale_cost#16]
            +- *(6) BroadcastHashJoin [ss_hdemo_sk#5], [hd_demo_sk#61], Inner, BuildRight, (((((((cd_marital_status#54 = M) AND (cd_education_status#55 = Advanced Degree)) AND (ss_sales_price#13 >= 100.00)) AND (ss_sales_price#13 <= 150.00)) AND (hd_dep_count#64 = 3)) OR (((((cd_marital_status#54 = S) AND (cd_education_status#55 = College)) AND (ss_sales_price#13 >= 50.00)) AND (ss_sales_price#13 <= 100.00)) AND (hd_dep_count#64 = 1))) OR (((((cd_marital_status#54 = W) AND (cd_education_status#55 = 2 yr Degree)) AND (ss_sales_price#13 >= 150.00)) AND (ss_sales_price#13 <= 200.00)) AND (hd_dep_count#64 = 1)))
               :- *(6) Project [ss_hdemo_sk#5, ss_quantity#10, ss_sales_price#13, ss_ext_sales_price#15, ss_ext_wholesale_cost#16, cd_marital_status#54, cd_education_status#55]
               :  +- *(6) BroadcastHashJoin [ss_cdemo_sk#4], [cd_demo_sk#52], Inner, BuildRight, ((((((cd_marital_status#54 = M) AND (cd_education_status#55 = Advanced Degree)) AND (ss_sales_price#13 >= 100.00)) AND (ss_sales_price#13 <= 150.00)) OR ((((cd_marital_status#54 = S) AND (cd_education_status#55 = College)) AND (ss_sales_price#13 >= 50.00)) AND (ss_sales_price#13 <= 100.00))) OR ((((cd_marital_status#54 = W) AND (cd_education_status#55 = 2 yr Degree)) AND (ss_sales_price#13 >= 150.00)) AND (ss_sales_price#13 <= 200.00)))
               :     :- *(6) Project [ss_cdemo_sk#4, ss_hdemo_sk#5, ss_quantity#10, ss_sales_price#13, ss_ext_sales_price#15, ss_ext_wholesale_cost#16]
               :     :  +- *(6) BroadcastHashJoin [ss_sold_date_sk#0], [d_date_sk#79], Inner, BuildRight
               :     :     :- *(6) Project [ss_sold_date_sk#0, ss_cdemo_sk#4, ss_hdemo_sk#5, ss_quantity#10, ss_sales_price#13, ss_ext_sales_price#15, ss_ext_wholesale_cost#16]
               :     :     :  +- *(6) BroadcastHashJoin [ss_addr_sk#6], [ca_address_sk#66], Inner, BuildRight, ((((ca_state#74 IN (TX,OH) AND (ss_net_profit#22 >= 100.00)) AND (ss_net_profit#22 <= 200.00)) OR ((ca_state#74 IN (OR,NM,KY) AND (ss_net_profit#22 >= 150.00)) AND (ss_net_profit#22 <= 300.00))) OR ((ca_state#74 IN (VA,TX,MS) AND (ss_net_profit#22 >= 50.00)) AND (ss_net_profit#22 <= 250.00)))
               :     :     :     :- *(6) Project [ss_sold_date_sk#0, ss_cdemo_sk#4, ss_hdemo_sk#5, ss_addr_sk#6, ss_quantity#10, ss_sales_price#13, ss_ext_sales_price#15, ss_ext_wholesale_cost#16, ss_net_profit#22]
               :     :     :     :  +- *(6) BroadcastHashJoin [ss_store_sk#7], [s_store_sk#23], Inner, BuildRight
               :     :     :     :     :- *(6) Filter ((((ss_net_profit#22 >= 100.00) AND (ss_net_profit#22 <= 200.00)) OR ((ss_net_profit#22 >= 150.00) AND (ss_net_profit#22 <= 300.00))) OR ((ss_net_profit#22 >= 50.00) AND (ss_net_profit#22 <= 250.00)))
               :     :     :     :     :  +- *(6) ColumnarToRow
               :     :     :     :     :     +- FileScan parquet default.store_sales[ss_sold_date_sk#0,ss_cdemo_sk#4,ss_hdemo_sk#5,ss_addr_sk#6,ss_store_sk#7,ss_quantity#10,ss_sales_price#13,ss_ext_sales_price#15,ss_ext_wholesale_cost#16,ss_net_profit#22] Batched: true, DataFilters: [((((ss_net_profit#22 >= 100.00) AND (ss_net_profit#22 <= 200.00)) OR ((ss_net_profit#22 >= 150.0..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [Or(Or(And(GreaterThanOrEqual(ss_net_profit,100.00),LessThanOrEqual(ss_net_profit,200.00)),And(Gr..., ReadSchema: struct<ss_sold_date_sk:int,ss_cdemo_sk:int,ss_hdemo_sk:int,ss_addr_sk:int,ss_store_sk:int,ss_quan...
               :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#213]
               :     :     :     :        +- *(1) ColumnarToRow
               :     :     :     :           +- FileScan parquet default.store[s_store_sk#23] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<s_store_sk:int>
               :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#222]
               :     :     :        +- *(2) Project [ca_address_sk#66, ca_state#74]
               :     :     :           +- *(2) Filter ((ca_country#76 = United States) AND ((ca_state#74 IN (TX,OH) OR ca_state#74 IN (OR,NM,KY)) OR ca_state#74 IN (VA,TX,MS)))
               :     :     :              +- *(2) ColumnarToRow
               :     :     :                 +- FileScan parquet default.customer_address[ca_address_sk#66,ca_state#74,ca_country#76] Batched: true, DataFilters: [(ca_country#76 = United States), ((ca_state#74 IN (TX,OH) OR ca_state#74 IN (OR,NM,KY)) OR ca_st..., Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [EqualTo(ca_country,United States), Or(Or(In(ca_state, [TX,OH]),In(ca_state, [OR,NM,KY])),In(ca_s..., ReadSchema: struct<ca_address_sk:int,ca_state:string,ca_country:string>
               :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#231]
               :     :        +- *(3) Project [d_date_sk#79]
               :     :           +- *(3) Filter (d_year#85 = 2001)
               :     :              +- *(3) ColumnarToRow
               :     :                 +- FileScan parquet default.date_dim[d_date_sk#79,d_year#85] Batched: true, DataFilters: [(d_year#85 = 2001)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [EqualTo(d_year,2001)], ReadSchema: struct<d_date_sk:int,d_year:int>
               :     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#238]
               :        +- *(4) ColumnarToRow
               :           +- FileScan parquet default.customer_demographics[cd_demo_sk#52,cd_marital_status#54,cd_education_status#55] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cd_demo_sk:int,cd_marital_status:string,cd_education_status:string>
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#246]
                  +- *(5) Filter (((hd_dep_count#64 = 3) OR (hd_dep_count#64 = 1)) OR (hd_dep_count#64 = 1))
                     +- *(5) ColumnarToRow
                        +- FileScan parquet default.household_demographics[hd_demo_sk#61,hd_dep_count#64] Batched: true, DataFilters: [(((hd_dep_count#64 = 3) OR (hd_dep_count#64 = 1)) OR (hd_dep_count#64 = 1))], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/opensource/spark/sql/core/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [Or(Or(EqualTo(hd_dep_count,3),EqualTo(hd_dep_count,1)),EqualTo(hd_dep_count,1))], ReadSchema: struct<hd_demo_sk:int,hd_dep_count:int>
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang edited a comment on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
gengliangwang edited a comment on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640172081


   @wangyum Thanks for point it out.
   I made a mistake. This solution is not as powerful as CNF conversion 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640008576






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640008330


   **[Test build #123587 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123587/testReport)** for PR 28741 at commit [`2ca483e`](https://github.com/apache/spark/commit/2ca483e6ccfceb40c579d0094c43832f8896e84f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640008576






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640054008






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on a change in pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
maropu commented on a change in pull request #28741:
URL: https://github.com/apache/spark/pull/28741#discussion_r436268072



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPredicateThroughJoin.scala
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, Not, Or, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+trait PushPredicateThroughJoinBase extends Rule[LogicalPlan] with PredicateHelper {
+  protected def enablePushingExtraPredicates: Boolean
+  /**
+   * Splits join condition expressions or filter predicates (on a given join's output) into three
+   * categories based on the attributes required to evaluate them. Note that we explicitly exclude
+   * non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
+   * canEvaluateInRight to prevent pushing these predicates on either side of the join.
+   *
+   * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
+   */
+  private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
+    val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
+    val (leftEvaluateCondition, rest) =
+      pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
+    val (rightEvaluateCondition, commonCondition) =
+        rest.partition(expr => expr.references.subsetOf(right.outputSet))
+
+    // For the predicates in `commonCondition`, it is still possible to find sub-predicates which
+    // are able to be pushed down.
+    val leftExtraCondition = if (enablePushingExtraPredicates) {
+      commonCondition.flatMap(convertibleFilter(_, left.outputSet, canPartialPushDown = true))
+    } else {
+      Seq.empty
+    }
+    val rightExtraCondition = if (enablePushingExtraPredicates) {
+      commonCondition.flatMap(convertibleFilter(_, right.outputSet, canPartialPushDown = true))
+    } else {
+      Seq.empty
+    }
+
+    // To avoid expanding the join condition into conjunctive normal form and making the size
+    // of codegen much larger, `commonCondition` will be kept as original form in the new join
+    // condition.
+    (leftEvaluateCondition ++ leftExtraCondition, rightEvaluateCondition ++ rightExtraCondition,
+      commonCondition ++ nonDeterministic)
+  }
+
+  private def convertibleFilter(
+    condition: Expression,
+    outputSet: AttributeSet,
+    canPartialPushDown: Boolean): Option[Expression] = condition match {
+    // At here, it is not safe to just convert one side and remove the other side
+    // if we do not understand what the parent filters are.
+    //
+    // Here is an example used to explain the reason.
+    // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to

Review comment:
       nit: How about the case, `NOT(a = 2 OR b in ('1'))`? This case can be transformed into `NOT(a = 2) AND NOT(b in ('1'))` then I think it can be partially pushed down.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #28741: [SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640007372


   I will add more test cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maropu commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
maropu commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640057173


   Just to check; it seems this PR doesn't depend on CNF conversion, but we could get the totally same performance gains with https://github.com/apache/spark/pull/28733?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
gengliangwang closed pull request #28741:
URL: https://github.com/apache/spark/pull/28741


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a change in pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on a change in pull request #28741:
URL: https://github.com/apache/spark/pull/28741#discussion_r436249183



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushPredicateThroughJoin.scala
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, Not, Or, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+trait PushPredicateThroughJoinBase extends Rule[LogicalPlan] with PredicateHelper {
+  protected def enablePushingExtraPredicates: Boolean
+  /**
+   * Splits join condition expressions or filter predicates (on a given join's output) into three
+   * categories based on the attributes required to evaluate them. Note that we explicitly exclude
+   * non-deterministic (i.e., stateful) condition expressions in canEvaluateInLeft or
+   * canEvaluateInRight to prevent pushing these predicates on either side of the join.
+   *
+   * @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
+   */
+  private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
+    val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
+    val (leftEvaluateCondition, rest) =
+      pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
+    val (rightEvaluateCondition, commonCondition) =
+        rest.partition(expr => expr.references.subsetOf(right.outputSet))
+
+    // For the predicates in `commonCondition`, it is still possible to find sub-predicates which
+    // are able to be pushed down.
+    val leftExtraCondition = if (enablePushingExtraPredicates) {
+      commonCondition.flatMap(convertibleFilter(_, left.outputSet, canPartialPushDown = true))
+    } else {
+      Seq.empty
+    }
+    val rightExtraCondition = if (enablePushingExtraPredicates) {
+      commonCondition.flatMap(convertibleFilter(_, right.outputSet, canPartialPushDown = true))
+    } else {
+      Seq.empty
+    }
+
+    // To avoid expanding the join condition into conjunctive normal form and making the size
+    // of codegen much larger, `commonCondition` will be kept as original form in the new join
+    // condition.
+    (leftEvaluateCondition ++ leftExtraCondition, rightEvaluateCondition ++ rightExtraCondition,
+      commonCondition ++ nonDeterministic)
+  }
+
+  private def convertibleFilter(
+    condition: Expression,
+    outputSet: AttributeSet,
+    canPartialPushDown: Boolean): Option[Expression] = condition match {
+    // At here, it is not safe to just convert one side and remove the other side
+    // if we do not understand what the parent filters are.
+    //
+    // Here is an example used to explain the reason.
+    // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
+    // convert b in ('1'). If we only convert a = 2, we will end up with a filter
+    // NOT(a = 2), which will generate wrong results.
+    //
+    // Pushing one side of AND down is only safe to do at the top level or in the child
+    // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
+    // can be safely removed.
+    case And(left, right) =>
+      val leftResultOptional = convertibleFilter(left, outputSet, canPartialPushDown)
+      val rightResultOptional = convertibleFilter(right, outputSet, canPartialPushDown)
+      (leftResultOptional, rightResultOptional) match {
+        case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult))
+        case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
+        case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
+        case _ => None
+      }
+
+    // The Or predicate is convertible when both of its children can be pushed down.
+    // That is to say, if one/both of the children can be partially pushed down, the Or
+    // predicate can be partially pushed down as well.
+    //
+    // Here is an example used to explain the reason.
+    // Let's say we have
+    // (a1 AND a2) OR (b1 AND b2),
+    // a1 and b1 is convertible, while a2 and b2 is not.
+    // The predicate can be converted as
+    // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+    // As per the logical in And predicate, we can push down (a1 OR b1).
+    case Or(left, right) =>
+      for {
+        lhs <- convertibleFilter(left, outputSet, canPartialPushDown)
+        rhs <- convertibleFilter(right, outputSet, canPartialPushDown)
+      } yield Or(lhs, rhs)
+
+    case Not(pred) =>
+      val childResultOptional = convertibleFilter(pred, outputSet, canPartialPushDown = false)
+      childResultOptional.map(Not)
+
+    case other =>
+      if (other.references.subsetOf(outputSet)) {
+        Some(other)
+      } else {
+        None
+      }
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+  val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {

Review comment:
       This method is copied from the original one, except that the join condition is not changed when `enablePushingExtraPredicates` is true




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #28741: [WIP][SPARK-31919][SQL] Push down more predicates through Join

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #28741:
URL: https://github.com/apache/spark/pull/28741#issuecomment-640172081


   @wangyum Thanks for point it out.
   I made a mistake. This solution is not as powerful as CNF conversion since it can't even push down such predicate
   ```
   (p1(t1) or p2(t2)) and p3(t1)
   ```
   to `t1`
   
   with CNF we can push down 
   ```
   p1(t1) or p3(t1)
   ```
   
   I am closing this one and reopening #28733


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org