You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/19 06:45:56 UTC

[GitHub] [arrow-datafusion] AssHero opened a new pull request, #2750: try to reduce left/right/full join to inner join

AssHero opened a new pull request, #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750

    # Rationale for this change
   try to reduce left/right/full join to inner join
   
   for query: select ... from a left join b on ... where b.xx = 100;
   if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
   Therefore, there is no need to produce null rows for output, we can use
   inner join instead of left join.
   
   Generally, right join/full join can also be reduced to inner join according to these rules.
   
   # What changes are included in this PR?
   add reduce_outer_plan to try to reduce left/right/full join to inner join in src/planner.rs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1162521148

   > @AssHero Since this is a significant feature could you file an issue for it so that it gets included in the change logs.
   
   yes, I'll file an issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   |                                                       | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                                 |
   |                                                       |   Inner Join: #a.c2 = #tt3.c5                                                                                                            |
   |                                                       |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      |
   |                                                       |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            |
   |                                                       |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    |
   |                                                       |           TableScan: tt1 projection=Some([c1, c2])                                                                                       |
   |                                                       |           Filter: #tt2.c4 < Int64(100)                                                                                                   |
   |                                                       |             TableScan: tt2 projection=Some([c3, c4])
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   > | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                   
   > | -Inner Join: #a.c2 = #tt3.c5                                                                                                            
   > | ---Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      
   > | ----Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            
   > | -----Inner Join: #tt1.c1 = #tt2.c3                                                                                                    
   > | ------TableScan: tt1 projection=Some([c1, c2])                                                                                       
   > | ------Filter: #tt2.c4 < Int64(100)                                                                                                   
   > | -------TableScan: tt2 projection=Some([c3, c4])
   > | ---TableScan: tt3 projection=Some([c5, c6])
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905641685


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1375,3 +1375,351 @@ async fn hash_join_with_dictionary() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_left_join_1() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      Filter: #t2.t2_id < Int64(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Filter: #t2.t2_int < Int64(10) OR #t1.t1_int > Int64(2) AND #t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_3() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce subquery to inner join
+    let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where t3.t1_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Left Join: #t3.t1_int = #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "          Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "            Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "              TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "            Filter: #t2.t2_int < Int64(3) AND #t2.t2_id < Int64(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "              TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 22    | b       | 2      |       |         |        |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}

Review Comment:
   I'll add more test cases in test part of this rule, including this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905639039


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {

Review Comment:
   Let me fix this! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1163136396

   Thank you -- @AssHero  I will put this on my list to review carefully tomorrow. This optimization in general is quite tricky so I want to make sure I give it my full attention 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
waynexia commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r901885300


##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(
+    expr: &Expr,
+    columns: &mut Vec<Column>,
+    top_level: bool,
+) -> Result<()> {
+    match expr {
+        Expr::Column(col) => {
+            columns.push(col.clone());
+            Ok(())
+        }
+        Expr::BinaryExpr { left, op, right } => match op {
+            // If one of the inputs are null for these operators, the results should be false.
+            Operator::Eq
+            | Operator::NotEq
+            | Operator::Lt
+            | Operator::LtEq
+            | Operator::Gt
+            | Operator::GtEq => {
+                extract_nonnullable_columns(left, columns, false)?;
+                extract_nonnullable_columns(right, columns, false)
+            }
+            Operator::And => {
+                if !top_level {

Review Comment:
   Will something like `Not(And(a > 10, b < 10))` hits this check and gets ignored?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905640343


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,

Review Comment:
   I recognize this problem after pushing the code,  use clone() here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1162635636

   Split the test cases into single test method. Implementing this as optimization rule is in progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   |                                                       | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                                 |
   |                                                       |   Inner Join: #a.c2 = #tt3.c5                                                                                                            |
   |                                                       |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      |
   |                                                       |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            |
   |                                                       |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    |
   |                                                       |           TableScan: tt1 projection=Some([c1, c2])                                                                                       |
   |                                                       |           Filter: #tt2.c4 < Int64(100)                                                                                                   |
   |                                                       |             TableScan: tt2 projection=Some([c3, c4])
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r906177098


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {

Review Comment:
   This is for such query
   select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   
   The a.c4 can be used to reduce left join to inner join, but we need to know that a.c4 is corresponding with tt2.c4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1162448018

   @AssHero Since this is a significant feature could you file an issue for it so that it gets included in the change logs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb merged pull request #2750: Add optimizer pass to reduce `left`/`right`/`full` joins to `inner` join if possible

Posted by GitBox <gi...@apache.org>.
alamb merged PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r902102694


##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.

Review Comment:
   > I guess here is
   
   For And and Or exprs, we need to extract columns from all sub exprs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   `> | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                     ``                                                            
   > |   Inner Join: #a.c2 = #tt3.c5                                                                                                            
   > |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      
   > |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            
   > |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    
   > |           TableScan: tt1 projection=Some([c1, c2])                                                                                       
   > |           Filter: #tt2.c4 < Int64(100)                                                                                                   
   > |             TableScan: tt2 projection=Some([c3, c4])
   > |      TableScan: tt3 projection=Some([c5, c6])`
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   > | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                                 
   > |   Inner Join: #a.c2 = #tt3.c5                                                                                                            
   > |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      
   > |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            
   > |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    
   > |           TableScan: tt1 projection=Some([c1, c2])                                                                                       
   > |           Filter: #tt2.c4 < Int64(100)                                                                                                   
   > |             TableScan: tt2 projection=Some([c3, c4])
   > |      TableScan: tt3 projection=Some([c5, c6])
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905474273


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   this pass should probably be applied after FilterPushdown, so that as many predicates as possible are available



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {

Review Comment:
   why `iter_mut` and not `iter`?



##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1375,3 +1375,351 @@ async fn hash_join_with_dictionary() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_left_join_1() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "      Filter: #t2.t2_id < Int64(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_2() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 10 or (t1.t1_int > 2 and t2.t2_name != 'w')";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Filter: #t2.t2_int < Int64(10) OR #t1.t1_int > Int64(2) AND #t2.t2_name != Utf8(\"w\") [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "        TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 11    | a       | 1      | 11    | z       | 3      |",
+        "| 22    | b       | 2      | 22    | y       | 1      |",
+        "| 44    | d       | 4      | 44    | x       | 3      |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn reduce_left_join_3() -> Result<()> {
+    let ctx = create_join_context("t1_id", "t2_id")?;
+
+    // reduce subquery to inner join
+    let sql = "select * from (select t1.* from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_int < 3) t3 left join t2 on t3.t1_int = t2.t2_int where t3.t1_id < 100";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, #t2.t2_id, #t2.t2_name, #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "    Left Join: #t3.t1_int = #t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      Projection: #t3.t1_id, #t3.t1_name, #t3.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "        Projection: #t1.t1_id, #t1.t1_name, #t1.t1_int, alias=t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "          Inner Join: #t1.t1_id = #t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "            Filter: #t1.t1_id < Int64(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "              TableScan: t1 projection=Some([t1_id, t1_name, t1_int]) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
+        "            Filter: #t2.t2_int < Int64(3) AND #t2.t2_id < Int64(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "              TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+        "      TableScan: t2 projection=Some([t2_id, t2_name, t2_int]) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+-------+---------+--------+-------+---------+--------+",
+        "| t1_id | t1_name | t1_int | t2_id | t2_name | t2_int |",
+        "+-------+---------+--------+-------+---------+--------+",
+        "| 22    | b       | 2      |       |         |        |",
+        "+-------+---------+--------+-------+---------+--------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}

Review Comment:
   some negative tests might be good -- like that 
   
   ```
       let sql = "select * from t1 left join t2 on t1.t1_id = t2.t2_id where t2.t2_id IS NULL";
   ```
   
   isn't rewritten to an inner join



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,

Review Comment:
   I wonder if it is a problem to use the same `nonnullable_cols` here as the call to `reduce_outer_join` could add new columns if a fitler is encountered somewhere down the left side (e.g. in a subquery). Maybe a `clone()` should be passed here (or the length remembered and truncated prior to calling `reduce_outer_join` on the right side)



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {
+            input,
+            expr,
+            schema,
+            alias: _,
+        }) => {
+            let projection = schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    // strip alias, as they should not be part of filters
+                    let expr = match &expr[i] {
+                        Expr::Alias(expr, _) => expr.as_ref().clone(),
+                        expr => expr.clone(),
+                    };
+
+                    (field.qualified_name(), expr)
+                })
+                .collect::<HashMap<_, _>>();
+
+            // re-write all Columns based on this projection
+            for col in nonnullable_cols.iter_mut() {
+                if let Some(Expr::Column(column)) = projection.get(&col.flat_name()) {
+                    *col = column.clone();
+                }
+            }
+
+            // optimize inner
+            let new_input = reduce_outer_join(
+                _optimizer,
+                input,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            from_plan(plan, expr, &[new_input])
+        }
+        _ => {
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    reduce_outer_join(
+                        _optimizer,
+                        plan,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(

Review Comment:
   What do you think about using `Expr::nullable` instead?  That seems very similar / the same as what you are doing with this function (trying to determine if an expression can be nullable / non nullable)? 
   
   https://github.com/apache/arrow-datafusion/blob/d985c0a3b6d96b02028f3abb6edb361ea72cac14/datafusion/expr/src/expr_schema.rs#L148-L211
   
   If `Expr::nullable` doesn't work correctly I think we should update that



##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {

Review Comment:
   I think Sorts can also rewrite column names?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   > | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                                 |
   > |   Inner Join: #a.c2 = #tt3.c5                                                                                                            |
   > |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      |
   > |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            |
   > |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    |
   > |           TableScan: tt1 projection=Some([c1, c2])                                                                                       |
   > |           Filter: #tt2.c4 < Int64(100)                                                                                                   |
   > |             TableScan: tt2 projection=Some([c3, c4])
   > |      TableScan: tt3 projection=Some([c5, c6])
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r903138887


##########
datafusion/core/tests/sql/joins.rs:
##########
@@ -1342,3 +1342,458 @@ async fn join_with_hash_unsupported_data_type() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn reduce_left_join() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let t1_schema = Schema::new(vec![
+        Field::new("c1", DataType::Int32, true),
+        Field::new("c2", DataType::Utf8, true),
+        Field::new("c3", DataType::Int64, true),
+    ]);
+    let t1_data = RecordBatch::try_new(
+        Arc::new(t1_schema),
+        vec![
+            Arc::new(Int32Array::from_slice(&[1, 2, 3])),
+            Arc::new(StringArray::from_slice(&["aaa", "bbb", "ccc"])),
+            Arc::new(Int64Array::from_slice(&[100, 200, 300])),
+        ],
+    )?;
+    let t1 = MemTable::try_new(t1_data.schema(), vec![vec![t1_data]])?;
+    ctx.register_table("t1", Arc::new(t1))?;
+
+    let t2_schema = Schema::new(vec![
+        Field::new("c4", DataType::Int32, true),
+        Field::new("c5", DataType::Utf8, true),
+        Field::new("c6", DataType::Int64, true),
+    ]);
+    let t2_data = RecordBatch::try_new(
+        Arc::new(t2_schema),
+        vec![
+            Arc::new(Int32Array::from_slice(&[3, 4, 5])),
+            Arc::new(StringArray::from_slice(&["ccc", "ddd", "eee"])),
+            Arc::new(Int64Array::from_slice(&[300, 400, 500])),
+        ],
+    )?;
+    let t2 = MemTable::try_new(t2_data.schema(), vec![vec![t2_data]])?;
+    ctx.register_table("t2", Arc::new(t2))?;
+
+    // reduce to inner join
+    let sql = "select * from t1 left join t2 on t1.c1 = t2.c4 where t2.c6 < 1000";
+    let msg = format!("Creating logical plan for '{}'", sql);
+    let plan = ctx
+        .create_logical_plan(&("explain ".to_owned() + sql))
+        .expect(&msg);
+    let state = ctx.state();
+    let plan = state.optimize(&plan)?;
+    let expected = vec![
+        "Explain [plan_type:Utf8, plan:Utf8]",
+        "  Projection: #t1.c1, #t1.c2, #t1.c3, #t2.c4, #t2.c5, #t2.c6 [c1:Int32;N, c2:Utf8;N, c3:Int64;N, c4:Int32;N, c5:Utf8;N, c6:Int64;N]",
+        "    Inner Join: #t1.c1 = #t2.c4 [c1:Int32;N, c2:Utf8;N, c3:Int64;N, c4:Int32;N, c5:Utf8;N, c6:Int64;N]",
+        "      TableScan: t1 projection=Some([c1, c2, c3]) [c1:Int32;N, c2:Utf8;N, c3:Int64;N]",
+        "      Filter: #t2.c6 < Int64(1000) [c4:Int32;N, c5:Utf8;N, c6:Int64;N]",
+        "        TableScan: t2 projection=Some([c4, c5, c6]) [c4:Int32;N, c5:Utf8;N, c6:Int64;N]",
+    ];
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+        expected, actual
+    );
+    let expected = vec![
+        "+----+-----+-----+----+-----+-----+",
+        "| c1 | c2  | c3  | c4 | c5  | c6  |",
+        "+----+-----+-----+----+-----+-----+",
+        "| 3  | ccc | 300 | 3  | ccc | 300 |",
+        "+----+-----+-----+----+-----+-----+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_sorted_eq!(expected, &results);
+
+    // could not reduce, use left join

Review Comment:
   Can we split the tests out into separate test methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r903140029


##########
datafusion/sql/src/planner.rs:
##########
@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
 
+                // reduce outer joins
+                let plans = reduce_outer_joins(plans, &filter_expr)?;

Review Comment:
   This seems like it should be implemented as an optimization rule rather than in the SQL query planner and this would mean that we would get the same optimizations regardless of whether the query was created from SQL or from the  DataFrame API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r906177098


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {

Review Comment:
   This is for such query
   select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   
   The a.c4 can be used to reduce left join to inner join, but we need to know that a.c4 is corresponding with tt2.c4.
   
   I need to think about the case with sort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905638161


##########
datafusion/optimizer/src/reduce_outer_join.rs:
##########
@@ -0,0 +1,355 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Optimizer rule to reduce left/right/full join to inner join if possible.
+use crate::{OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFSchema, Result};
+use datafusion_expr::{
+    logical_plan::{Filter, Join, JoinType, LogicalPlan, Projection},
+    utils::from_plan,
+};
+use datafusion_expr::{Expr, Operator};
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+#[derive(Default)]
+pub struct ReduceOuterJoin;
+
+impl ReduceOuterJoin {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl OptimizerRule for ReduceOuterJoin {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let mut nonnullable_cols: Vec<Column> = vec![];
+
+        reduce_outer_join(self, plan, &mut nonnullable_cols, optimizer_config)
+    }
+
+    fn name(&self) -> &str {
+        "reduce_outer_join"
+    }
+}
+
+/// Attempt to reduce outer joins to inner joins.
+/// for query: select ... from a left join b on ... where b.xx = 100;
+/// if b.xx is null, and b.xx = 100 returns false, filterd those null rows.
+/// Therefore, there is no need to produce null rows for output, we can use
+/// inner join instead of left join.
+///
+/// Generally, an outer join can be reduced to inner join if quals from where
+/// return false while any inputs are null and columns of those quals are come from
+/// nullable side of outer join.
+fn reduce_outer_join(
+    _optimizer: &ReduceOuterJoin,
+    plan: &LogicalPlan,
+    nonnullable_cols: &mut Vec<Column>,
+    _optimizer_config: &OptimizerConfig,
+) -> Result<LogicalPlan> {
+    match plan {
+        LogicalPlan::Filter(Filter { input, predicate }) => match &**input {
+            LogicalPlan::Join(join) => {
+                extract_nonnullable_columns(
+                    predicate,
+                    nonnullable_cols,
+                    join.left.schema(),
+                    join.right.schema(),
+                    true,
+                )?;
+                Ok(LogicalPlan::Filter(Filter {
+                    predicate: predicate.clone(),
+                    input: Arc::new(reduce_outer_join(
+                        _optimizer,
+                        input,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )?),
+                }))
+            }
+            _ => Ok(LogicalPlan::Filter(Filter {
+                predicate: predicate.clone(),
+                input: Arc::new(reduce_outer_join(
+                    _optimizer,
+                    input,
+                    nonnullable_cols,
+                    _optimizer_config,
+                )?),
+            })),
+        },
+        LogicalPlan::Join(join) => {
+            let mut new_join_type = join.join_type;
+
+            if join.join_type == JoinType::Left
+                || join.join_type == JoinType::Right
+                || join.join_type == JoinType::Full
+            {
+                let mut left_nonnullable = false;
+                let mut right_nonnullable = false;
+                for col in nonnullable_cols.iter_mut() {
+                    if join.left.schema().field_from_column(col).is_ok() {
+                        left_nonnullable = true;
+                    }
+                    if join.right.schema().field_from_column(col).is_ok() {
+                        right_nonnullable = true;
+                    }
+                }
+
+                match join.join_type {
+                    JoinType::Left => {
+                        if right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Right => {
+                        if left_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        }
+                    }
+                    JoinType::Full => {
+                        if left_nonnullable && right_nonnullable {
+                            new_join_type = JoinType::Inner;
+                        } else if left_nonnullable {
+                            new_join_type = JoinType::Left;
+                        } else if right_nonnullable {
+                            new_join_type = JoinType::Right;
+                        }
+                    }
+                    _ => {}
+                };
+            }
+
+            let left_plan = reduce_outer_join(
+                _optimizer,
+                &join.left,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+            let right_plan = reduce_outer_join(
+                _optimizer,
+                &join.right,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            Ok(LogicalPlan::Join(Join {
+                left: Arc::new(left_plan),
+                right: Arc::new(right_plan),
+                join_type: new_join_type,
+                join_constraint: join.join_constraint,
+                on: join.on.clone(),
+                filter: join.filter.clone(),
+                schema: join.schema.clone(),
+                null_equals_null: join.null_equals_null,
+            }))
+        }
+        LogicalPlan::Projection(Projection {
+            input,
+            expr,
+            schema,
+            alias: _,
+        }) => {
+            let projection = schema
+                .fields()
+                .iter()
+                .enumerate()
+                .map(|(i, field)| {
+                    // strip alias, as they should not be part of filters
+                    let expr = match &expr[i] {
+                        Expr::Alias(expr, _) => expr.as_ref().clone(),
+                        expr => expr.clone(),
+                    };
+
+                    (field.qualified_name(), expr)
+                })
+                .collect::<HashMap<_, _>>();
+
+            // re-write all Columns based on this projection
+            for col in nonnullable_cols.iter_mut() {
+                if let Some(Expr::Column(column)) = projection.get(&col.flat_name()) {
+                    *col = column.clone();
+                }
+            }
+
+            // optimize inner
+            let new_input = reduce_outer_join(
+                _optimizer,
+                input,
+                nonnullable_cols,
+                _optimizer_config,
+            )?;
+
+            from_plan(plan, expr, &[new_input])
+        }
+        _ => {
+            let expr = plan.expressions();
+
+            // apply the optimization to all inputs of the plan
+            let inputs = plan.inputs();
+            let new_inputs = inputs
+                .iter()
+                .map(|plan| {
+                    reduce_outer_join(
+                        _optimizer,
+                        plan,
+                        nonnullable_cols,
+                        _optimizer_config,
+                    )
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            from_plan(plan, &expr, &new_inputs)
+        }
+    }
+}
+
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(

Review Comment:
   The extract_nonnullable_columns gets all columns of these exprs which returns false if inputs are null, and we use these columns to see if they appear in null rows of output for outer join.  If so, this means these null rows does not meet the conditions, we can filter null rows, reduce outer to inner. So I think the purpose of extract_nonnullable_columns is not the same as Expr::nullable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1168005581

   There is a logical conflict in this PR with https://github.com/apache/arrow-datafusion/pull/2789
   
   I took the liberty of fixing the conflicts in d0f1f8365
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r902103709


##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(
+    expr: &Expr,
+    columns: &mut Vec<Column>,
+    top_level: bool,
+) -> Result<()> {
+    match expr {
+        Expr::Column(col) => {
+            columns.push(col.clone());
+            Ok(())
+        }
+        Expr::BinaryExpr { left, op, right } => match op {
+            // If one of the inputs are null for these operators, the results should be false.
+            Operator::Eq
+            | Operator::NotEq
+            | Operator::Lt
+            | Operator::LtEq
+            | Operator::Gt
+            | Operator::GtEq => {
+                extract_nonnullable_columns(left, columns, false)?;
+                extract_nonnullable_columns(right, columns, false)
+            }
+            Operator::And => {
+                if !top_level {
+                    return Ok(());
+                }
+                extract_nonnullable_columns(left, columns, top_level)?;
+                extract_nonnullable_columns(right, columns, top_level)
+            }
+            Operator::Or => {
+                let mut left_nonnullable_cols: Vec<Column> = vec![];
+                let mut right_nonnullable_cols: Vec<Column> = vec![];
+
+                extract_nonnullable_columns(left, &mut left_nonnullable_cols, top_level)?;
+                extract_nonnullable_columns(
+                    right,
+                    &mut right_nonnullable_cols,
+                    top_level,
+                )?;
+
+                // for query: select *** from a left join b where b.c1 ... or b.c2 ...
+                // this can be reduced to inner join.
+                // for query: select *** from a left join b where a.c1 ... or b.c2 ...
+                // this can not be reduced.
+                // If columns of relation exist in both sub exprs, any columns of this relation
+                // can be added to non nullable columns.
+                if !left_nonnullable_cols.is_empty() && !right_nonnullable_cols.is_empty()
+                {
+                    for left_col in &left_nonnullable_cols {
+                        for right_col in &right_nonnullable_cols {
+                            if let (Some(l_rel), Some(r_rel)) =
+                                (&left_col.relation, &right_col.relation)
+                            {
+                                if l_rel == r_rel {
+                                    columns.push(left_col.clone());
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                }
+                Ok(())
+            }
+            _ => Ok(()),
+        },
+        Expr::Not(arg) => extract_nonnullable_columns(arg, columns, false),
+        Expr::IsNotNull(arg) => {
+            if !top_level {
+                return Ok(());
+            }
+            extract_nonnullable_columns(arg, columns, false)
+        }
+        _ => Ok(()),
+    }
+}
+
+/// try to reduce one outer join to inner join
+fn reduce_outer_join(
+    plan: &LogicalPlan,
+    nonnullable_cols: &Vec<Column>,
+) -> Result<Option<LogicalPlan>> {

Review Comment:
   > How about change this function to modify plan in place like
   > 
   > ```rust
   > fn reduce_outer_join(plan: mut LogicalPlan, ...)  -> Result<LogicalPlan>;
   > ```
   > 
   > This might simplify [the match](https://github.com/apache/arrow-datafusion/pull/2750/files#diff-c01a34949db6258aa1593f011ecf90f62cbde406acd5cdbf8b9b60b970ace1cfR2717-R2770)
   
   inplace update is better, and I'll check it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r902105760


##########
datafusion/sql/src/planner.rs:
##########
@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
 
+                // reduce outer joins
+                let plans = reduce_outer_joins(plans, &filter_expr)?;

Review Comment:
   > I'm not sure that whether this should be done here, or somewhere else like in optimize phase. Other parts are great ❤️ cc @alamb
   
   Here we have joins and quals from where and these make reduce possible.  Please give me your suggestions, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1159637532

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2750](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0208a7d) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/0289bfe6a98bdae371eee29d3f257b173ddb4437?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0289bfe) will **increase** coverage by `0.03%`.
   > The diff coverage is `90.18%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #2750      +/-   ##
   ==========================================
   + Coverage   84.95%   84.99%   +0.03%     
   ==========================================
     Files         271      271              
     Lines       48049    48365     +316     
   ==========================================
   + Hits        40821    41106     +285     
   - Misses       7228     7259      +31     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [datafusion/sql/src/planner.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2750/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9zcWwvc3JjL3BsYW5uZXIucnM=) | `80.97% <73.04%> (-0.43%)` | :arrow_down: |
   | [datafusion/core/tests/sql/joins.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2750/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9jb3JlL3Rlc3RzL3NxbC9qb2lucy5ycw==) | `99.36% <100.00%> (+0.29%)` | :arrow_up: |
   | [datafusion/expr/src/logical\_plan/plan.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/2750/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy9sb2dpY2FsX3BsYW4vcGxhbi5ycw==) | `73.91% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [0289bfe...0208a7d](https://codecov.io/gh/apache/arrow-datafusion/pull/2750?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r902105760


##########
datafusion/sql/src/planner.rs:
##########
@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
 
+                // reduce outer joins
+                let plans = reduce_outer_joins(plans, &filter_expr)?;

Review Comment:
   > I'm not sure that whether this should be done here, or somewhere else like in optimize phase. Other parts are great ❤️ cc @alamb
   
   Here we have joins and quals from where and these make reduce possible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r902103396


##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(
+    expr: &Expr,
+    columns: &mut Vec<Column>,
+    top_level: bool,
+) -> Result<()> {
+    match expr {
+        Expr::Column(col) => {
+            columns.push(col.clone());
+            Ok(())
+        }
+        Expr::BinaryExpr { left, op, right } => match op {
+            // If one of the inputs are null for these operators, the results should be false.
+            Operator::Eq
+            | Operator::NotEq
+            | Operator::Lt
+            | Operator::LtEq
+            | Operator::Gt
+            | Operator::GtEq => {
+                extract_nonnullable_columns(left, columns, false)?;
+                extract_nonnullable_columns(right, columns, false)
+            }
+            Operator::And => {
+                if !top_level {

Review Comment:
   > Will something like `Not(And(a > 10, b < 10))` hits this check and gets ignored?
   
   YES. If not from top_level, we can handle And expr as Or expr, this is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
waynexia commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r901869070


##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.
+/// For or expr, if one of sub exprs returns true, discards all columns from or expr.
+/// For IS NOT NULL/NOT expr, always returns false for NULL input.
+///     extracts columns from these exprs.
+/// For all other exprs, fall through
+fn extract_nonnullable_columns(
+    expr: &Expr,
+    columns: &mut Vec<Column>,
+    top_level: bool,
+) -> Result<()> {
+    match expr {
+        Expr::Column(col) => {
+            columns.push(col.clone());
+            Ok(())
+        }
+        Expr::BinaryExpr { left, op, right } => match op {
+            // If one of the inputs are null for these operators, the results should be false.
+            Operator::Eq
+            | Operator::NotEq
+            | Operator::Lt
+            | Operator::LtEq
+            | Operator::Gt
+            | Operator::GtEq => {
+                extract_nonnullable_columns(left, columns, false)?;
+                extract_nonnullable_columns(right, columns, false)
+            }
+            Operator::And => {
+                if !top_level {
+                    return Ok(());
+                }
+                extract_nonnullable_columns(left, columns, top_level)?;
+                extract_nonnullable_columns(right, columns, top_level)
+            }
+            Operator::Or => {
+                let mut left_nonnullable_cols: Vec<Column> = vec![];
+                let mut right_nonnullable_cols: Vec<Column> = vec![];
+
+                extract_nonnullable_columns(left, &mut left_nonnullable_cols, top_level)?;
+                extract_nonnullable_columns(
+                    right,
+                    &mut right_nonnullable_cols,
+                    top_level,
+                )?;
+
+                // for query: select *** from a left join b where b.c1 ... or b.c2 ...
+                // this can be reduced to inner join.
+                // for query: select *** from a left join b where a.c1 ... or b.c2 ...
+                // this can not be reduced.
+                // If columns of relation exist in both sub exprs, any columns of this relation
+                // can be added to non nullable columns.
+                if !left_nonnullable_cols.is_empty() && !right_nonnullable_cols.is_empty()
+                {
+                    for left_col in &left_nonnullable_cols {
+                        for right_col in &right_nonnullable_cols {
+                            if let (Some(l_rel), Some(r_rel)) =
+                                (&left_col.relation, &right_col.relation)
+                            {
+                                if l_rel == r_rel {
+                                    columns.push(left_col.clone());
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                }
+                Ok(())
+            }
+            _ => Ok(()),
+        },
+        Expr::Not(arg) => extract_nonnullable_columns(arg, columns, false),
+        Expr::IsNotNull(arg) => {
+            if !top_level {
+                return Ok(());
+            }
+            extract_nonnullable_columns(arg, columns, false)
+        }
+        _ => Ok(()),
+    }
+}
+
+/// try to reduce one outer join to inner join
+fn reduce_outer_join(
+    plan: &LogicalPlan,
+    nonnullable_cols: &Vec<Column>,
+) -> Result<Option<LogicalPlan>> {

Review Comment:
   How about change this function to modify plan in place like 
   ```rust
   fn reduce_outer_join(plan: mut LogicalPlan, ...)  -> Result<LogicalPlan>;
   ```
   This might simplify [the match](https://github.com/apache/arrow-datafusion/pull/2750/files#diff-c01a34949db6258aa1593f011ecf90f62cbde406acd5cdbf8b9b60b970ace1cfR2717-R2770)
   



##########
datafusion/sql/src/planner.rs:
##########
@@ -2584,6 +2587,230 @@ fn extract_join_keys(
     }
 }
 
+/// Recursively traversese expr, if expr returns false when
+/// any inputs are null, treats columns of both sides as nonnullable columns.
+///
+/// For and/or expr, extracts from all sub exprs and merges the columns.

Review Comment:
   I guess here is
   ```suggestion
   /// For and expr, extracts from all sub exprs and merges the columns.
   ```



##########
datafusion/sql/src/planner.rs:
##########
@@ -784,6 +784,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 let filter_expr = self.sql_to_rex(predicate_expr, &join_schema, ctes)?;
 
+                // reduce outer joins
+                let plans = reduce_outer_joins(plans, &filter_expr)?;

Review Comment:
   I'm not sure that whether this should be done here, or somewhere else like in optimize phase. Other parts are great :heart: cc @alamb  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on a diff in pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on code in PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#discussion_r905655191


##########
datafusion/core/src/execution/context.rs:
##########
@@ -1229,6 +1230,7 @@ impl SessionState {
         if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
             rules.push(Arc::new(FilterNullJoinKeys::default()));
         }
+        rules.push(Arc::new(ReduceOuterJoin::new()));

Review Comment:
   After FilterPushdown, the filters like (c1 < 100) are distributed to tables. At this time, we need to check from the bottom to up, and this need more infos to reduce outer join.
   
   For query: select * from (select * from tt1 left join tt2 on c1 = c3) a right join tt3 on a.c2 = tt3.c5 where a.c4 < 100;
   after FilterPushdown, the logical plan is
   
   | Projection: #a.c1, #a.c2, #a.c3, #a.c4, #tt3.c5, #tt3.c6                                                                                 |
   |   Inner Join: #a.c2 = #tt3.c5                                                                                                            |
   |     Projection: #a.c1, #a.c2, #a.c3, #a.c4, alias=a                                                                                      |
   |       Projection: #tt1.c1, #tt1.c2, #tt2.c3, #tt2.c4, alias=a                                                                            |
   |         Inner Join: #tt1.c1 = #tt2.c3                                                                                                    |
   |           TableScan: tt1 projection=Some([c1, c2])                                                                                       |
   |           Filter: #tt2.c4 < Int64(100)                                                                                                   |
   |             TableScan: tt2 projection=Some([c3, c4])
   |      TableScan: tt3 projection=Some([c5, c6])
   
   we need to walk to the tablescan, and get the filter, then back to the parent plan to reduce the outer joins, I think this is more complex and complicated to implement.
   
   Please let me know if I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] AssHero commented on pull request #2750: try to reduce left/right/full join to inner join

Posted by GitBox <gi...@apache.org>.
AssHero commented on PR #2750:
URL: https://github.com/apache/arrow-datafusion/pull/2750#issuecomment-1164558228

   Implementing reduce outer join as optimization rule. More improving is in progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org