You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/11/26 13:14:02 UTC

[GitHub] [arrow-datafusion] jackwener commented on a diff in pull request #4365: reimplement `push_down_filter` to remove global-state

jackwener commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1032786229


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -403,94 +294,90 @@ fn extract_or_clause(expr: &Expr, schema_columns: &HashSet<Column>) -> Option<Ex
     predicate
 }
 
-fn optimize_join(
-    mut state: State,
+// push down join/cross-join
+fn push_down_all_join(
+    predicates: Vec<Expr>,
     plan: &LogicalPlan,
     left: &LogicalPlan,
     right: &LogicalPlan,
-    on_filter: Vec<Predicate>,
+    on_filter: Vec<Expr>,
 ) -> Result<LogicalPlan> {
+    let on_filter_empty = on_filter.is_empty();
     // Get pushable predicates from current optimizer state
     let (left_preserved, right_preserved) = lr_is_preserved(plan)?;
-    let to_left =
-        get_pushable_join_predicates(&state.filters, left.schema(), left_preserved);
-    let to_right =
-        get_pushable_join_predicates(&state.filters, right.schema(), right_preserved);
-    let to_keep: Predicates = state
-        .filters
-        .iter()
-        .filter(|(e, _)| !to_left.0.contains(&e) && !to_right.0.contains(&e))
-        .map(|(a, b)| (a, b))
-        .unzip();
+    let mut left_push = vec![];
+    let mut right_push = vec![];
+
+    let mut keep_predicates = vec![];
+    for predicate in predicates {
+        if left_preserved && can_pushdown_join_predicate(&predicate, left.schema())? {
+            left_push.push(predicate);
+        } else if right_preserved
+            && can_pushdown_join_predicate(&predicate, right.schema())?
+        {
+            right_push.push(predicate);
+        } else {
+            keep_predicates.push(predicate);
+        }
+    }
 
-    // Get pushable predicates from join filter
-    let (on_to_left, on_to_right, on_to_keep) = if on_filter.is_empty() {
-        ((vec![], vec![]), (vec![], vec![]), vec![])
-    } else {
+    let mut keep_condition = vec![];
+    if !on_filter.is_empty() {
         let (on_left_preserved, on_right_preserved) = on_lr_is_preserved(plan)?;
-        let on_to_left =
-            get_pushable_join_predicates(&on_filter, left.schema(), on_left_preserved);
-        let on_to_right =
-            get_pushable_join_predicates(&on_filter, right.schema(), on_right_preserved);
-        let on_to_keep = on_filter
-            .iter()
-            .filter(|(e, _)| !on_to_left.0.contains(&e) && !on_to_right.0.contains(&e))
-            .map(|(a, _)| a.clone())
-            .collect::<Vec<_>>();
-
-        (on_to_left, on_to_right, on_to_keep)
-    };
+        for on in on_filter {
+            if on_left_preserved && can_pushdown_join_predicate(&on, left.schema())? {
+                left_push.push(on)
+            } else if on_right_preserved
+                && can_pushdown_join_predicate(&on, right.schema())?
+            {
+                right_push.push(on)
+            } else {
+                keep_condition.push(on)
+            }
+        }
+    }
 
     // Extract from OR clause, generate new predicates for both side of join if possible.
     // We only track the unpushable predicates above.
-    let or_to_left =
-        extract_or_clauses_for_join(&to_keep.0, left.schema(), left_preserved);
-    let or_to_right =
-        extract_or_clauses_for_join(&to_keep.0, right.schema(), right_preserved);
+    // TODO: we just get, but don't remove them from origin expr.
+    let or_to_left = extract_or_clauses_for_join(
+        &keep_predicates.iter().collect::<Vec<_>>(),
+        left.schema(),
+        left_preserved,
+    );
+    let or_to_right = extract_or_clauses_for_join(
+        &keep_predicates.iter().collect::<Vec<_>>(),
+        right.schema(),
+        right_preserved,
+    );
     let on_or_to_left = extract_or_clauses_for_join(
-        &on_to_keep.iter().collect::<Vec<_>>(),
+        &keep_condition.iter().collect::<Vec<_>>(),
         left.schema(),
         left_preserved,
     );
     let on_or_to_right = extract_or_clauses_for_join(
-        &on_to_keep.iter().collect::<Vec<_>>(),
+        &keep_condition.iter().collect::<Vec<_>>(),
         right.schema(),
         right_preserved,
     );
 
-    // Build new filter states using pushable predicates
-    // from current optimizer states and from ON clause.
-    // Then recursively call optimization for both join inputs
-    let mut left_state = State::default();
-    left_state.append_predicates(to_left);
-    left_state.append_predicates(on_to_left);
-    or_to_left
-        .0
-        .into_iter()
-        .zip(or_to_left.1)
-        .for_each(|(expr, cols)| left_state.filters.push((expr, cols)));
-    on_or_to_left
-        .0
-        .into_iter()
-        .zip(on_or_to_left.1)
-        .for_each(|(expr, cols)| left_state.filters.push((expr, cols)));
-    let left = optimize(left, left_state)?;

Review Comment:
   Original implementation here is bottom-up, outside is top down. Now unify to `top down`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org