You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/07/09 09:55:02 UTC

[GitHub] [arrow-datafusion] xudong963 opened a new pull request, #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

xudong963 opened a new pull request, #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858

   Closes #217 
   
   Query plan for tpch 18, focus on **filter plan**
   ```shell
   === Logical plan ===
   Projection: #SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue
     Aggregate: groupBy=[[]], aggr=[[SUM(#lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount)]]
       Filter: #part.p_partkey = #lineitem.l_partkey AND #part.p_brand = Utf8("Brand#12") AND #part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND #lineitem.l_quantity >= Int64(1) AND #lineitem.l_quantity <= Int64(1) + Int64(10) AND #part.p_size BETWEEN Int64(1) AND Int64(5) AND #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") OR #part.p_partkey = #lineitem.l_partkey AND #part.p_brand = Utf8("Brand#23") AND #part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND #lineitem.l_quantity >= Int64(10) AND #lineitem.l_quantity <= Int64(10) + Int64(10) AND #part.p_size BETWEEN Int64(1) AND Int64(10) AND #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") OR #part.p_partkey = #lineitem.l_partkey AND #part.p_brand = Utf8("Brand#34") AND #part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX
 "), Utf8("LG PACK"), Utf8("LG PKG")]) AND #lineitem.l_quantity >= Int64(20) AND #lineitem.l_quantity <= Int64(20) + Int64(10) AND #part.p_size BETWEEN Int64(1) AND Int64(15) AND #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON")
         CrossJoin:
           TableScan: lineitem
           TableScan: part
   
   === Optimized logical plan ===
   Projection: #SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue
     Aggregate: groupBy=[[]], aggr=[[SUM(#lineitem.l_extendedprice * Int64(1) - #lineitem.l_discount)]]
       Projection: #part.p_partkey = #lineitem.l_partkey AS BinaryExpr-=Column-lineitem.l_partkeyColumn-part.p_partkey, #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON") AS BinaryExpr-=LiteralDELIVER IN PERSONColumn-lineitem.l_shipinstruct, #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AS InList-falseLiteralAIR REGLiteralAIRColumn-lineitem.l_shipmode, #lineitem.l_quantity, #lineitem.l_extendedprice, #lineitem.l_discount, #part.p_brand, #part.p_size, #part.p_container
         Filter: #part.p_partkey = #lineitem.l_partkey AND #part.p_brand = Utf8("Brand#12") AND #part.p_container IN ([Utf8("SM CASE"), Utf8("SM BOX"), Utf8("SM PACK"), Utf8("SM PKG")]) AND #lineitem.l_quantity >= Int64(1) AND #lineitem.l_quantity <= Int64(11) AND #part.p_size BETWEEN Int64(1) AND Int64(5) OR #part.p_brand = Utf8("Brand#23") AND #part.p_container IN ([Utf8("MED BAG"), Utf8("MED BOX"), Utf8("MED PKG"), Utf8("MED PACK")]) AND #lineitem.l_quantity >= Int64(10) AND #lineitem.l_quantity <= Int64(20) AND #part.p_size BETWEEN Int64(1) AND Int64(10) OR #part.p_brand = Utf8("Brand#34") AND #part.p_container IN ([Utf8("LG CASE"), Utf8("LG BOX"), Utf8("LG PACK"), Utf8("LG PKG")]) AND #lineitem.l_quantity >= Int64(20) AND #lineitem.l_quantity <= Int64(30) AND #part.p_size BETWEEN Int64(1) AND Int64(15)
           CrossJoin:
             Filter: #lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]) AND #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON")
               TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[#lineitem.l_shipmode IN ([Utf8("AIR"), Utf8("AIR REG")]), #lineitem.l_shipinstruct = Utf8("DELIVER IN PERSON")]
             TableScan: part projection=[p_partkey, p_brand, p_size, p_container]
   ```
   
   We need to migrate the `cross join -> inner join optimization` from the planner to the optimizer so that tpch 19 can be further optimized to inner join using the predicate extracted by `rewrite_disjunctive_predicate`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1183057814

   Thanks@alamb, learn much from your comments, I'll address your comments in the PR later. (we can keep it open.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r919357423


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {

Review Comment:
   If you wanted to you could write this function more concisely like:
   
   ```rust
   fn normalize_predicate(predicate: &Predicate) -> Expr {
       match predicate {
           Predicate::And { args } => {
               assert!(args.len() >= 2);
               args.into_iter()
                   .map(|pred| normalize_predicate(&pred))
                   .reduce(Expr::and)
                   .expect("had more than one arg")
           }
           Predicate::Or { args } => {
               assert!(args.len() >= 2);
               args.into_iter()
                   .map(|pred| normalize_predicate(&pred))
                   .reduce(Expr::or)
                   .expect("had more than one arg")
           }
           Predicate::Other { expr } => *expr.clone(),
       }
   }
   ```
   
   (using the `reduce(Expr::and)`)



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {

Review Comment:
   This is clever πŸ‘ 



##########
datafusion/core/tests/sql/predicates.rs:
##########
@@ -386,3 +386,57 @@ async fn csv_in_set_test() -> Result<()> {
     assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn multiple_or_predicates() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_tpch_csv(&ctx, "lineitem").await?;
+    register_tpch_csv(&ctx, "part").await?;
+    let sql = "explain select
+    l_partkey
+    from
+    lineitem,
+    part
+    where
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#12'
+            and l_quantity >= 1 and l_quantity <= 1 + 10
+            and p_size between 1 and 5
+        )
+    or
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#23'
+            and l_quantity >= 10 and l_quantity <= 10 + 10
+            and p_size between 1 and 10
+        )
+    or
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#34'
+            and l_quantity >= 20 and l_quantity <= 20 + 10
+            and p_size between 1 and 15
+        )";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected =vec![

Review Comment:
   πŸ‘ 



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {

Review Comment:
   You can probably avoid a copy if you pass in the owned predicate here: 
   
   ```suggestion
   fn normalize_predicate(predicate: Predicate) -> Expr {
   ```



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {

Review Comment:
   If you wanted, I also think it would be neat to try and avoid clones. Like I wonder if you could make a signature like this work
   
   ```suggestion
   fn flatten_or_predicates(or_predicates: impl IntoIterator<Item=Predicate>) -> Vec<Predicate> {
   ```



##########
datafusion/core/tests/sql/predicates.rs:
##########
@@ -386,3 +386,57 @@ async fn csv_in_set_test() -> Result<()> {
     assert_batches_sorted_eq!(expected, &actual);
     Ok(())
 }
+
+#[tokio::test]
+async fn multiple_or_predicates() -> Result<()> {
+    let ctx = SessionContext::new();
+    register_tpch_csv(&ctx, "lineitem").await?;
+    register_tpch_csv(&ctx, "part").await?;
+    let sql = "explain select
+    l_partkey
+    from
+    lineitem,
+    part
+    where
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#12'
+            and l_quantity >= 1 and l_quantity <= 1 + 10
+            and p_size between 1 and 5
+        )
+    or
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#23'
+            and l_quantity >= 10 and l_quantity <= 10 + 10
+            and p_size between 1 and 10
+        )
+    or
+    (
+                p_partkey = l_partkey
+            and p_brand = 'Brand#34'
+            and l_quantity >= 20 and l_quantity <= 20 + 10
+            and p_size between 1 and 15
+        )";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx.create_logical_plan(sql).expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected =vec![

Review Comment:
   ```suggestion
       // Note that we expect `#part.p_partkey = #lineitem.l_partkey` to have been 
       // factored out and appear only once in the following plan
       let expected =vec![
   ```



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate

Review Comment:
   I don't understand the need for checking the shortest AND predicate -- is there some test case that would show why picking this is important?
   
   Or maybe another question is "why not check all elements?" Perhaps by keeping a set of expressions that were common (and checking each element in the set for its inclusion in all the arguments)



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(&new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(&exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let predicate = predicate(&filter.predicate)?;
+                let rewritten_predicate = rewrite_predicate(&predicate);
+                let rewritten_expr = normalize_predicate(&rewritten_predicate);
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: rewritten_expr,
+                    input: Arc::new(self.rewrite_disjunctive_predicate(
+                        &filter.input,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => {
+                let expr = plan.expressions();
+                let inputs = plan.inputs();
+                let new_inputs = inputs
+                    .iter()
+                    .map(|input| {
+                        self.rewrite_disjunctive_predicate(input, _optimizer_config)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                from_plan(plan, &expr, &new_inputs)
+            }
+        }
+    }
+}
+
+impl OptimizerRule for RewriteDisjunctivePredicate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        self.rewrite_disjunctive_predicate(plan, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "rewrite_disjunctive_predicate"
+    }
+}
+
+#[cfg(test)]
+
+mod tests {
+    use crate::rewrite_disjunctive_predicate::{
+        normalize_predicate, predicate, rewrite_predicate, Predicate,
+    };
+
+    use datafusion_common::{Column, Result, ScalarValue};
+    use datafusion_expr::Expr::BinaryExpr;
+    use datafusion_expr::{Expr, Operator};
+
+    #[test]
+    fn test_rewrite_predicate() -> Result<()> {
+        let equi_expr = Expr::BinaryExpr {
+            left: Box::new(Expr::Column(Column {
+                relation: None,
+                name: "t1.a".to_string(),
+            })),
+            op: Operator::Eq,
+            right: Box::new(Expr::Column(Column {
+                relation: None,
+                name: "t2.b".to_string(),
+            })),
+        };

Review Comment:
   I think you could write these tests much more concisely (and thus readably) like
   
   ```suggestion
           let equi_expr = col("t1.a").eq(col("t2.b"));
   ```



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }

Review Comment:
   ```suggestion
           let found = or_predicates.iter()
               .all(|or_predicate| {
                   match or_predicate {
                       Predicate::And { args } => args.contains(expr),
                       _ => or_predicate == expr
                   }
               });
   ```
   
   I think is equivalent and might be more "idomatic"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1179528791

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2858](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f690cf1) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/a2da7209ed4a0da68010dcd541a25032a040b9e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a2da720) will **increase** coverage by `0.00%`.
   > The diff coverage is `85.59%`.
   
   ```diff
   @@           Coverage Diff            @@
   ##           master    #2858    +/-   ##
   ========================================
     Coverage   85.24%   85.24%            
   ========================================
     Files         275      276     +1     
     Lines       49002    49245   +243     
   ========================================
   + Hits        41773    41981   +208     
   - Misses       7229     7264    +35     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Ξ” | |
   |---|---|---|
   | [...ion/optimizer/src/rewrite\_disjunctive\_predicate.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2858/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9vcHRpbWl6ZXIvc3JjL3Jld3JpdGVfZGlzanVuY3RpdmVfcHJlZGljYXRlLnJz) | `85.53% <85.53%> (ΓΈ)` | |
   | [datafusion/core/src/execution/context.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2858/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `78.82% <100.00%> (+0.02%)` | :arrow_up: |
   | [datafusion/physical-expr/src/expressions/binary.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2858/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9leHByZXNzaW9ucy9iaW5hcnkucnM=) | `95.03% <0.00%> (-0.09%)` | :arrow_down: |
   | [datafusion/expr/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2858/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `74.50% <0.00%> (+0.19%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Ξ” = absolute <relative> (impact)`, `ΓΈ = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [a2da720...f690cf1](https://codecov.io/gh/apache/arrow-datafusion/pull/2858?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r927636440


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate

Review Comment:
   > I don't understand the need for checking the shortest AND predicate
   
   The shortest AND predicate could be the common expression to be extracted if each of its elements appears in all OR predicates.
   
   > why not check all elements
   
   We don't need to check all elements, only the shortest could be the common expression.
   
   > keeping a set of expressions
   
   Yes, `shortest_exprs` should be a `HashSet`, but to avoid implementing `Eq` and `Hash`(it'll be spread deeply), I use `Vec` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1180550312

   > Perhaps some unit tests in datafusion/optimizer/src/rewrite_disjunctive_predicate.rs and then a explain test for q18 showing the inner join?
   
   Currently, q19 can't be converted to inner join, because the logic of  `cross join -> inner join` is in planner not in optimizer
   
   
   > I think the very nice code structure in datafusion/optimizer/src/rewrite_disjunctive_predicate.rs would make it quite easy to write unit tests.
   
   Yes, added
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1185555182

   Marking as draft to signify more work is planned on this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r917950725


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::{
+    Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, Distinct, Explain,
+    Filter, Join, Limit, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
+    Window,
+};
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(&new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(&exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {

Review Comment:
   It seems to be new interfaces for `LogicalPlan` added during my absence, I would have liked to add similar interfaces after this ticket, because I was tired of writing here 🀣 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1195921006

   πŸš€  -- thanks again @xudong963 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r927600652


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }

Review Comment:
   Yes, I like it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r917378407


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::{
+    Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, Distinct, Explain,
+    Filter, Join, Limit, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
+    Window,
+};
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {

Review Comment:
   I wonder if using `ExprRewriter` here might make the code less verbose (and avoid a copy). Doing so would require a non trivial refactor though



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::{
+    Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, Distinct, Explain,
+    Filter, Join, Limit, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
+    Window,
+};
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(&new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(&exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {

Review Comment:
   I think you can avoid having to explicitly enumerate out each `LogicalPlan` variant here by using `plan.expressions` and `from_plan`. 
   
   https://github.com/apache/arrow-datafusion/blob/master/datafusion/optimizer/src/eliminate_limit.rs#L111-L122
   
   Using those functions also has the nice benefit that if new expressions are added to plans or new logical plan variants are added, this particular rewrite does not need to be changed
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1192572099

   Sorry for the delay in updating, I'm busy with work recently.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r927600427


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let mut found = true;
+        for or_predicate in or_predicates.iter() {
+            match or_predicate {
+                Predicate::And { args } => {
+                    if !args.contains(expr) {
+                        found = false;
+                        break;
+                    }
+                }
+                _ => {
+                    if or_predicate != expr {
+                        found = false;
+                        break;
+                    }
+                }
+            }
+        }
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(&new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(&exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let predicate = predicate(&filter.predicate)?;
+                let rewritten_predicate = rewrite_predicate(&predicate);
+                let rewritten_expr = normalize_predicate(&rewritten_predicate);
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: rewritten_expr,
+                    input: Arc::new(self.rewrite_disjunctive_predicate(
+                        &filter.input,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => {
+                let expr = plan.expressions();
+                let inputs = plan.inputs();
+                let new_inputs = inputs
+                    .iter()
+                    .map(|input| {
+                        self.rewrite_disjunctive_predicate(input, _optimizer_config)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                from_plan(plan, &expr, &new_inputs)
+            }
+        }
+    }
+}
+
+impl OptimizerRule for RewriteDisjunctivePredicate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        self.rewrite_disjunctive_predicate(plan, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "rewrite_disjunctive_predicate"
+    }
+}
+
+#[cfg(test)]
+
+mod tests {
+    use crate::rewrite_disjunctive_predicate::{
+        normalize_predicate, predicate, rewrite_predicate, Predicate,
+    };
+
+    use datafusion_common::{Column, Result, ScalarValue};
+    use datafusion_expr::Expr::BinaryExpr;
+    use datafusion_expr::{Expr, Operator};
+
+    #[test]
+    fn test_rewrite_predicate() -> Result<()> {
+        let equi_expr = Expr::BinaryExpr {
+            left: Box::new(Expr::Column(Column {
+                relation: None,
+                name: "t1.a".to_string(),
+            })),
+            op: Operator::Eq,
+            right: Box::new(Expr::Column(Column {
+                relation: None,
+                name: "t2.b".to_string(),
+            })),
+        };

Review Comment:
   Nice APIs!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#issuecomment-1195922852

   Benchmark runs are scheduled for baseline = 0f1999072e803d6f9f0a7c2b0cfa538d225a9eef and contender = 4005076d8e3e4fa07541da62f7a6c9c755029da1. 4005076d8e3e4fa07541da62f7a6c9c755029da1 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/36b6fd33ca1e45a7942657c155f365e0...4226064e35504906a5f51faa03ebb755/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/4cf7180d7418459bb7b4fbd7cd627c77...6c6921503d2d480f989608937462a0de/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/824601c4764b4267a309f157aad92633...9ca82a9a98aa451cb381d322c64b167e/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/33ba2324bf134546a9c7becd92cb9aad...eca73417db6b4d3fab68f94b20c1513d/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r917948594


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::{
+    Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin, Distinct, Explain,
+    Filter, Join, Limit, Projection, Repartition, Sort, Subquery, SubqueryAlias, Union,
+    Window,
+};
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {

Review Comment:
   The method is to recursely extract all args of `And` or `Or`, such as `a > 1 and b and c + 1 = 2` => `Predicate::And {[a > 1, b, c + 1 =2]}`. The return value is `Predicate` not `Expr`, so seems can't reuse `ExprRewriter`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] xudong963 commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
xudong963 commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r929981363


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate

Review Comment:
   > For example, in the following predicate, the common sub expression`(p_partkey = l_partkey OR p_partkey > 5)` is not the shortest
   
   The common sub-expression is from the shortest AND predicate, but the shortest AND predicate is not equal to the common sub-expression(beside each element in the shortest AND predicate is in all the OR arguments.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2858: feat: add optimize rule `rewrite_disjunctive_predicate`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2858:
URL: https://github.com/apache/arrow-datafusion/pull/2858#discussion_r929245313


##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            args.into_iter()
+                .map(normalize_predicate)
+                .reduce(Expr::and)
+                .expect("had more than one arg")
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            assert!(args.len() >= 2);
+            args.into_iter()
+                .map(normalize_predicate)
+                .reduce(Expr::or)
+                .expect("had more than one arg")
+        }
+        Predicate::Other { expr } => *expr,
+    }
+}
+
+fn rewrite_predicate(predicate: Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg.clone()));
+            }
+            rewritten_args = flatten_and_predicates(rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg.clone()));
+            }
+            rewritten_args = flatten_or_predicates(rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr),
+        },
+    }
+}
+
+fn flatten_and_predicates(
+    and_predicates: impl IntoIterator<Item = Predicate>,
+) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate);
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(
+    or_predicates: impl IntoIterator<Item = Predicate>,
+) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate);
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let found = or_predicates.iter().all(|or_predicate| match or_predicate {
+            Predicate::And { args } => args.contains(expr),
+            _ => or_predicate == expr,
+        });
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let predicate = predicate(&filter.predicate)?;
+                let rewritten_predicate = rewrite_predicate(predicate);
+                let rewritten_expr = normalize_predicate(rewritten_predicate);
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: rewritten_expr,
+                    input: Arc::new(self.rewrite_disjunctive_predicate(
+                        &filter.input,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => {
+                let expr = plan.expressions();

Review Comment:
   πŸ‘  very nice



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate

Review Comment:
   > We don't need to check all elements, only the shortest could be the common expression.
   
   I don't understand why only the shortest could be the common expression. I am probably missing something. 
   
   For example, in the following predicate, the common sub expression`(p_partkey = l_partkey OR p_partkey > 5)` is not the shortest 
   
   ```
     (
               (p_partkey = l_partkey OR p_partkey > 5)
               and p_brand = 'Brand#12'
           )
       or
       (
               (p_partkey = l_partkey OR p_partkey > 5)
               and p_size between 1 and 10
           )
       or
       (
                (p_partkey = l_partkey OR p_partkey > 5)
               and p_size between 1 and 15
           )";
   ```
   
   and yet it could be factored out
   
   ```
    (p_partkey = l_partkey OR p_partkey > 5)
   and 
     (
               p_brand = 'Brand#12'
           )
       or
       (
               p_size between 1 and 10
           )
       or
       (
               p_size between 1 and 15
           )";
   ```
   
   πŸ€” 
   



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            args.into_iter()
+                .map(normalize_predicate)
+                .reduce(Expr::and)
+                .expect("had more than one arg")
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            assert!(args.len() >= 2);
+            args.into_iter()
+                .map(normalize_predicate)
+                .reduce(Expr::or)
+                .expect("had more than one arg")
+        }
+        Predicate::Other { expr } => *expr,
+    }
+}
+
+fn rewrite_predicate(predicate: Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg.clone()));
+            }
+            rewritten_args = flatten_and_predicates(rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg.clone()));
+            }
+            rewritten_args = flatten_or_predicates(rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr),
+        },
+    }
+}
+
+fn flatten_and_predicates(
+    and_predicates: impl IntoIterator<Item = Predicate>,
+) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate);
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(
+    or_predicates: impl IntoIterator<Item = Predicate>,
+) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate);
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let args_num = args.len();
+                if shortest_exprs.is_empty() || args_num < shortest_exprs_len {
+                    shortest_exprs = (*args).clone();
+                    shortest_exprs_len = args_num;
+                }
+            }
+            _ => {
+                // if there is no AND predicate, it must be the shortest expression.
+                shortest_exprs = vec![or_predicate.clone()];
+                break;
+            }
+        }
+    }
+
+    // dedup shortest_exprs
+    shortest_exprs.dedup();
+
+    // Check each element in shortest_exprs to see if it's in all the OR arguments.
+    let mut exist_exprs: Vec<Predicate> = vec![];
+    for expr in shortest_exprs.iter() {
+        let found = or_predicates.iter().all(|or_predicate| match or_predicate {
+            Predicate::And { args } => args.contains(expr),
+            _ => or_predicate == expr,
+        });
+        if found {
+            exist_exprs.push((*expr).clone());
+        }
+    }
+    if exist_exprs.is_empty() {
+        return Predicate::Or {
+            args: or_predicates.to_vec(),
+        };
+    }
+
+    // Rebuild the OR predicate.
+    // (A AND B) OR A will be optimized to A.
+    let mut new_or_predicates = vec![];
+    for or_predicate in or_predicates.iter() {
+        match or_predicate {
+            Predicate::And { args } => {
+                let mut new_args = (*args).clone();
+                new_args.retain(|expr| !exist_exprs.contains(expr));
+                if !new_args.is_empty() {
+                    if new_args.len() == 1 {
+                        new_or_predicates.push(new_args[0].clone());
+                    } else {
+                        new_or_predicates.push(Predicate::And { args: new_args });
+                    }
+                } else {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+            _ => {
+                if exist_exprs.contains(or_predicate) {
+                    new_or_predicates.clear();
+                    break;
+                }
+            }
+        }
+    }
+    if !new_or_predicates.is_empty() {
+        if new_or_predicates.len() == 1 {
+            exist_exprs.push(new_or_predicates[0].clone());
+        } else {
+            exist_exprs.push(Predicate::Or {
+                args: flatten_or_predicates(new_or_predicates),
+            });
+        }
+    }
+
+    if exist_exprs.len() == 1 {
+        exist_exprs[0].clone()
+    } else {
+        Predicate::And {
+            args: flatten_and_predicates(exist_exprs),
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct RewriteDisjunctivePredicate;
+
+impl RewriteDisjunctivePredicate {
+    pub fn new() -> Self {
+        Self::default()
+    }
+    fn rewrite_disjunctive_predicate(
+        &self,
+        plan: &LogicalPlan,
+        _optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        match plan {
+            LogicalPlan::Filter(filter) => {
+                let predicate = predicate(&filter.predicate)?;
+                let rewritten_predicate = rewrite_predicate(predicate);
+                let rewritten_expr = normalize_predicate(rewritten_predicate);
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: rewritten_expr,
+                    input: Arc::new(self.rewrite_disjunctive_predicate(
+                        &filter.input,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => {
+                let expr = plan.expressions();
+                let inputs = plan.inputs();
+                let new_inputs = inputs
+                    .iter()
+                    .map(|input| {
+                        self.rewrite_disjunctive_predicate(input, _optimizer_config)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                from_plan(plan, &expr, &new_inputs)
+            }
+        }
+    }
+}
+
+impl OptimizerRule for RewriteDisjunctivePredicate {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        self.rewrite_disjunctive_predicate(plan, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "rewrite_disjunctive_predicate"
+    }
+}
+
+#[cfg(test)]
+
+mod tests {
+    use crate::rewrite_disjunctive_predicate::{
+        normalize_predicate, predicate, rewrite_predicate, Predicate,
+    };
+
+    use datafusion_common::{Result, ScalarValue};
+    use datafusion_expr::{and, col, lit, or};
+
+    #[test]
+    fn test_rewrite_predicate() -> Result<()> {
+        let equi_expr = col("t1.a").eq(col("t2.b"));
+        let gt_expr = col("t1.c").gt(lit(ScalarValue::Int8(Some(1))));
+        let lt_expr = col("t1.d").lt(lit(ScalarValue::Int8(Some(2))));
+        let expr = or(
+            and(equi_expr.clone(), gt_expr.clone()),
+            and(equi_expr.clone(), lt_expr.clone()),
+        );
+        let predicate = predicate(&expr)?;
+        assert_eq!(
+            predicate,
+            Predicate::Or {
+                args: vec![
+                    Predicate::And {
+                        args: vec![
+                            Predicate::Other {
+                                expr: Box::new(equi_expr.clone())
+                            },
+                            Predicate::Other {
+                                expr: Box::new(gt_expr.clone())
+                            }
+                        ]
+                    },
+                    Predicate::And {
+                        args: vec![
+                            Predicate::Other {
+                                expr: Box::new(equi_expr.clone())
+                            },
+                            Predicate::Other {
+                                expr: Box::new(lt_expr.clone())
+                            }
+                        ]
+                    }
+                ]
+            }
+        );
+        let rewritten_predicate = rewrite_predicate(predicate);
+        assert_eq!(

Review Comment:
   πŸ‘ 



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,429 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: &Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut and_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::And,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                and_expr = BinaryExpr {
+                    left: Box::new(and_expr),
+                    op: Operator::And,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            and_expr
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            let left = normalize_predicate(&args[0]);
+            let right = normalize_predicate(&args[1]);
+            let mut or_expr = BinaryExpr {
+                left: Box::new(left),
+                op: Operator::Or,
+                right: Box::new(right),
+            };
+            for arg in args.iter().skip(2) {
+                or_expr = BinaryExpr {
+                    left: Box::new(or_expr),
+                    op: Operator::Or,
+                    right: Box::new(normalize_predicate(arg)),
+                };
+            }
+            or_expr
+        }
+        Predicate::Other { expr } => *expr.clone(),
+    }
+}
+
+fn rewrite_predicate(predicate: &Predicate) -> Predicate {
+    match predicate {
+        Predicate::And { args } => {
+            let mut rewritten_args = Vec::with_capacity(args.len());
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_and_predicates(&rewritten_args);
+            Predicate::And {
+                args: rewritten_args,
+            }
+        }
+        Predicate::Or { args } => {
+            let mut rewritten_args = vec![];
+            for arg in args.iter() {
+                rewritten_args.push(rewrite_predicate(arg));
+            }
+            rewritten_args = flatten_or_predicates(&rewritten_args);
+            delete_duplicate_predicates(&rewritten_args)
+        }
+        Predicate::Other { expr } => Predicate::Other {
+            expr: Box::new(*expr.clone()),
+        },
+    }
+}
+
+fn flatten_and_predicates(and_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in and_predicates {
+        match predicate {
+            Predicate::And { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_and_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn flatten_or_predicates(or_predicates: &[Predicate]) -> Vec<Predicate> {
+    let mut flattened_predicates = vec![];
+    for predicate in or_predicates {
+        match predicate {
+            Predicate::Or { args } => {
+                flattened_predicates
+                    .extend_from_slice(flatten_or_predicates(args).as_slice());
+            }
+            _ => {
+                flattened_predicates.push(predicate.clone());
+            }
+        }
+    }
+    flattened_predicates
+}
+
+fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
+    let mut shortest_exprs: Vec<Predicate> = vec![];
+    let mut shortest_exprs_len = 0;
+    // choose the shortest AND predicate

Review Comment:
   That being said, I don't think it is incorrect to pick the shortest predicate, but I do think it may miss potential rewrites. We can always improve it in the future perhaps



##########
datafusion/optimizer/src/rewrite_disjunctive_predicate.rs:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Filter;
+use datafusion_expr::utils::from_plan;
+use datafusion_expr::Expr::BinaryExpr;
+use datafusion_expr::{Expr, LogicalPlan, Operator};
+use std::sync::Arc;
+
+#[derive(Clone, PartialEq, Debug)]
+enum Predicate {
+    And { args: Vec<Predicate> },
+    Or { args: Vec<Predicate> },
+    Other { expr: Box<Expr> },
+}
+
+fn predicate(expr: &Expr) -> Result<Predicate> {
+    match expr {
+        BinaryExpr { left, op, right } => match op {
+            Operator::And => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::And { args })
+            }
+            Operator::Or => {
+                let args = vec![predicate(left)?, predicate(right)?];
+                Ok(Predicate::Or { args })
+            }
+            _ => Ok(Predicate::Other {
+                expr: Box::new(BinaryExpr {
+                    left: left.clone(),
+                    op: *op,
+                    right: right.clone(),
+                }),
+            }),
+        },
+        _ => Ok(Predicate::Other {
+            expr: Box::new(expr.clone()),
+        }),
+    }
+}
+
+fn normalize_predicate(predicate: Predicate) -> Expr {
+    match predicate {
+        Predicate::And { args } => {
+            assert!(args.len() >= 2);
+            args.into_iter()
+                .map(normalize_predicate)
+                .reduce(Expr::and)
+                .expect("had more than one arg")
+        }
+        Predicate::Or { args } => {
+            assert!(args.len() >= 2);
+            assert!(args.len() >= 2);

Review Comment:
   ```suggestion
   ```
   
   Seems like the `assert!` was repeated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org