You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/12 18:04:30 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3578: extract OR clause for join

alamb commented on code in PR #3578:
URL: https://github.com/apache/arrow-datafusion/pull/3578#discussion_r993757679


##########
datafusion/optimizer/src/filter_push_down.rs:
##########
@@ -248,6 +249,145 @@ fn get_pushable_join_predicates<'a>(
         .unzip()
 }
 
+// examine OR clause to see if any useful clauses can be extracted and push down.
+// extract at least one qual of each sub clauses of OR clause, then form the quals
+// to new OR clause as predicate.
+//
+// Filter: (a = c and a < 20) or (b = d and b > 10)
+//     join/crossjoin:
+//          TableScan: projection=[a, b]
+//          TableScan: projection=[c, d]
+//
+// is optimized to
+//
+// Filter: (a = c and a < 20) or (b = d and b > 10)
+//     join/crossjoin:
+//          Filter: (a < 20) or (b > 10)
+//              TableScan: projection=[a, b]
+//          TableScan: projection=[c, d]
+//
+// In general, predicates of this form:
+//
+// (A AND B) OR (C AND D)
+//
+// will be transformed to
+//
+// ((A AND B) OR (C AND D)) AND (A OR C)
+//
+// OR
+//
+// ((A AND B) OR (C AND D)) AND ((A AND B) OR C)
+//
+// OR
+//
+// do nothing.
+//
+fn extract_or_clauses_for_join(
+    filters: &[&Expr],
+    schema: &DFSchema,
+    preserved: bool,
+) -> (Vec<Expr>, Vec<HashSet<Column>>) {
+    if !preserved {
+        return (vec![], vec![]);
+    }
+
+    let schema_columns = schema
+        .fields()
+        .iter()
+        .flat_map(|f| {
+            [
+                f.qualified_column(),
+                // we need to push down filter using unqualified column as well
+                f.unqualified_column(),
+            ]
+        })
+        .collect::<HashSet<_>>();
+
+    let mut exprs = vec![];
+    let mut expr_columns = vec![];
+    for expr in filters.iter() {
+        if let Expr::BinaryExpr {
+            left,
+            op: Operator::Or,
+            right,
+        } = expr
+        {
+            let left_expr = extract_or_clause(left.as_ref(), &schema_columns);
+            let right_expr = extract_or_clause(right.as_ref(), &schema_columns);
+
+            // If nothing can be extracted from any sub clauses, do nothing for this OR clause.
+            if let (Some(left_expr), Some(right_expr)) = (left_expr, right_expr) {
+                let predicate = or(left_expr, right_expr);
+                let mut columns: HashSet<Column> = HashSet::new();
+                expr_to_columns(&predicate, &mut columns).ok().unwrap();
+
+                exprs.push(predicate);
+                expr_columns.push(columns);
+            }
+        }
+    }
+
+    (exprs, expr_columns)
+}
+
+// extract qual from OR sub-clause.

Review Comment:
   Can you please add some additional comments under what conditions the `OR` clause is extracted?  I tried to explain above



##########
datafusion/optimizer/src/filter_push_down.rs:
##########
@@ -248,6 +249,145 @@ fn get_pushable_join_predicates<'a>(
         .unzip()
 }
 
+// examine OR clause to see if any useful clauses can be extracted and push down.
+// extract at least one qual of each sub clauses of OR clause, then form the quals
+// to new OR clause as predicate.
+//

Review Comment:
   I would also like to see the return type documented here (as in what does the `(Vec<Expr>, Vec<HashSet<Column>>)` represent? I think it is the extracted quals and their column references but I am not sure



##########
datafusion/optimizer/src/filter_push_down.rs:
##########
@@ -248,6 +249,145 @@ fn get_pushable_join_predicates<'a>(
         .unzip()
 }
 
+// examine OR clause to see if any useful clauses can be extracted and push down.
+// extract at least one qual of each sub clauses of OR clause, then form the quals
+// to new OR clause as predicate.
+//

Review Comment:
   I think we need to explain the conditions under which a qual can be extracted as it may not be obvious to someone when they initially look at this. 
   
   ```suggestion
   // to new OR clause as predicate.
   //
   // A qual is extracted if it it contains (only) common set of column references with the other quals.
   ```
   
   I am not sure that is correct



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1484,7 +1484,8 @@ async fn reduce_left_join_2() -> Result<()> {
         "    Filter: CAST(#t2.t2_int AS Int64) < Int64(10) OR CAST(#t1.t1_int AS Int64) > Int64(2) AND #t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
         "      Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
         "        TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
-        "        TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        Filter: CAST(#t2.t2_int AS Int64) < Int64(10) OR #t2.t2_name != Utf8(\"w\") [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",

Review Comment:
   👍  nice



##########
datafusion/core/tests/sql/predicates.rs:
##########
@@ -430,9 +430,10 @@ async fn multiple_or_predicates() -> Result<()> {
         "    Projection: #part.p_size >= Int32(1) AS #part.p_size >= Int32(1)Int32(1)#part.p_size, #lineitem.l_partkey, #lineitem.l_quantity, #part.p_brand, #part.p_size [#part.p_size >= Int32(1)Int32(1)#part.p_size:Boolean;N, l_partkey:Int64, l_quantity:Decimal128(15, 2), p_brand:Utf8, p_size:Int32]",
         "      Filter: #part.p_brand = Utf8(\"Brand#12\") AND #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND #part.p_size <= Int32(5) OR #part.p_brand = Utf8(\"Brand#23\") AND #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND #part.p_size <= Int32(10) OR #part.p_brand = Utf8(\"Brand#34\") AND #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND #part.p_size <= Int32(15) [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
         "        Inner Join: #lineitem.l_partkey = #part.p_partkey [l_partkey:Int64, l_quantity:Decimal128(15, 2), p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
-        "          TableScan: lineitem projection=[l_partkey, l_quantity] [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",
-        "          Filter: #part.p_size >= Int32(1) [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
-        "            TableScan: part projection=[p_partkey, p_brand, p_size], partial_filters=[#part.p_size >= Int32(1)] [p_partkey:Int64, p_brand:Utf8, p_size:Int32]",
+        "          Filter: #lineitem.l_quantity >= Decimal128(Some(100),15,2) AND #lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR #lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR #lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND #lineitem.l_quantity <= Decimal128(Some(3000),15,2) [l_partkey:Int64, l_quantity:Decimal128(15, 2)]",

Review Comment:
   I went through this plan and I agree it seems correct (as in the pushed down filters don't filter out anything that would have passed the original filter)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org