You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/04/19 04:23:44 UTC

[GitHub] [arrow-datafusion] wangxiaoying opened a new issue, #2271: Inner join incorrectly pushdown predicate with OR operation

wangxiaoying opened a new issue, #2271:
URL: https://github.com/apache/arrow-datafusion/issues/2271

   **Describe the bug**
   Currently the join result might be incorrect when the predicate contains OR operation, since the filter will be pushed down.
   
   **To Reproduce**
   ```rust
   fn main() {
       let ctx = SessionContext::new();
   
       let id64_array = Int64Array::from(vec![Some(1), Some(2), Some(0), Some(3), Some(4)]);
       let str_array = StringArray::from(vec!["1", "2", "3", "4", "5"]);
       let schema = Schema::new(vec![
           Field::new("test_int", DataType::Int64, true),
           Field::new("str", DataType::Utf8, false),
       ]);
   
       let batch = RecordBatch::try_new(
           Arc::new(schema),
           vec![Arc::new(id64_array), Arc::new(str_array)],
       )
       .unwrap();
   
       let db1 = MemTable::try_new(batch.schema(), vec![vec![batch]]).unwrap();
       ctx.register_table("t1", Arc::new(db1)).unwrap();
   
       let sql = "select * from t1 inner join t1 tmp on t1.test_int = tmp.test_int and t1.str = '3' OR tmp.str = '1000'";
   
       let rt = Arc::new(tokio::runtime::Runtime::new().expect("Failed to create runtime"));
       let df = rt.block_on(ctx.sql(sql)).unwrap();
       rt.block_on(df.limit(5).unwrap().explain(false, false).unwrap().show())
           .unwrap();
       rt.block_on(df.limit(5).unwrap().show()).unwrap();
       let num_rows = rt
           .block_on(df.collect())
           .unwrap()
           .into_iter()
           .map(|rb| rb.num_rows())
           .sum::<usize>();
       println!("Final # rows: {}", num_rows);
   }
   ```
   
   Result is
   ```
   +---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                                                           |
   +---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: 5                                                                                                                                       |
   |               |   Projection: #t1.test_int, #t1.str, #tmp.test_int, #tmp.str                                                                                   |
   |               |     Inner Join: #t1.test_int = #tmp.test_int                                                                                                   |
   |               |       Filter: #t1.str = Utf8("3")                                                                                                              |
   |               |         TableScan: t1 projection=Some([0, 1])                                                                                                  |
   |               |       Filter: #tmp.str = Utf8("1000")                                                                                                          |
   |               |         SubqueryAlias: tmp                                                                                                                     |
   |               |           TableScan: t1 projection=Some([0, 1])                                                                                                |
   | physical_plan | GlobalLimitExec: limit=5                                                                                                                       |
   |               |   CoalescePartitionsExec                                                                                                                       |
   |               |     LocalLimitExec: limit=5                                                                                                                    |
   |               |       ProjectionExec: expr=[test_int@0 as test_int, str@1 as str, test_int@2 as test_int, str@3 as str]                                        |
   |               |         CoalesceBatchesExec: target_batch_size=4096                                                                                            |
   |               |           HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "test_int", index: 0 }, Column { name: "test_int", index: 0 })] |
   |               |             CoalesceBatchesExec: target_batch_size=4096                                                                                        |
   |               |               RepartitionExec: partitioning=Hash([Column { name: "test_int", index: 0 }], 8)                                                   |
   |               |                 CoalesceBatchesExec: target_batch_size=4096                                                                                    |
   |               |                   FilterExec: str@1 = 3                                                                                                        |
   |               |                     RepartitionExec: partitioning=RoundRobinBatch(8)                                                                           |
   |               |                       MemoryExec: partitions=1, partition_sizes=[1]                                                                            |
   |               |             CoalesceBatchesExec: target_batch_size=4096                                                                                        |
   |               |               RepartitionExec: partitioning=Hash([Column { name: "test_int", index: 0 }], 8)                                                   |
   |               |                 CoalesceBatchesExec: target_batch_size=4096                                                                                    |
   |               |                   FilterExec: str@1 = 1000                                                                                                     |
   |               |                     RepartitionExec: partitioning=RoundRobinBatch(8)                                                                           |
   |               |                       MemoryExec: partitions=1, partition_sizes=[1]                                                                            |
   |               |                                                                                                                                                |
   +---------------+------------------------------------------------------------------------------------------------------------------------------------------------+
   ++
   ++
   Final # rows: 0
   ```
   
   **Expected behavior**
   
   The result should look like
   ```
   +---------------+-------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                  |
   +---------------+-------------------------------------------------------------------------------------------------------+
   | logical_plan  | Limit: 5                                                                                              |
   |               |   Projection: #t1.test_int, #t1.str, #tmp.test_int, #tmp.str                                          |
   |               |     Filter: #t1.test_int = #tmp.test_int AND #t1.str = Utf8("3") OR #tmp.str = Utf8("1000")           |
   |               |       CrossJoin:                                                                                      |
   |               |         TableScan: t1 projection=Some([0, 1])                                                         |
   |               |         SubqueryAlias: tmp                                                                            |
   |               |           TableScan: t1 projection=Some([0, 1])                                                       |
   | physical_plan | GlobalLimitExec: limit=5                                                                              |
   |               |   CoalescePartitionsExec                                                                              |
   |               |     ProjectionExec: expr=[test_int@0 as test_int, str@1 as str, test_int@2 as test_int, str@3 as str] |
   |               |       CoalesceBatchesExec: target_batch_size=4096                                                     |
   |               |         FilterExec: test_int@0 = test_int@2 AND str@1 = 3 OR str@3 = 1000                             |
   |               |           CrossJoinExec                                                                               |
   |               |             RepartitionExec: partitioning=RoundRobinBatch(8)                                          |
   |               |               MemoryExec: partitions=1, partition_sizes=[1]                                           |
   |               |             RepartitionExec: partitioning=RoundRobinBatch(8)                                          |
   |               |               MemoryExec: partitions=1, partition_sizes=[1]                                           |
   |               |                                                                                                       |
   +---------------+-------------------------------------------------------------------------------------------------------+
   +----------+-----+----------+-----+
   | test_int | str | test_int | str |
   +----------+-----+----------+-----+
   | 0        | 3   | 0        | 3   |
   +----------+-----+----------+-----+
   Final # rows: 1
   ```
   
   Where the predicate `Filter: #t1.str = Utf8("3")` should not be pushed down due to the `OR`.
   
   I created a similar table in postgres and apply a similar query, here is the correct result:
   <img width="1082" alt="image" src="https://user-images.githubusercontent.com/5569610/163919342-c8d23585-03f2-4ce2-a5ac-e043c56282b1.png">
   
   **Additional context**
   These two queries should output the same result:
   q1: `select * from t1 cross join t1 tmp where t1.test_int = tmp.test_int and t1.str = '3' OR tmp.str = '1000'`
   q2: `select * from t1 inner join t1 tmp on t1.test_int = tmp.test_int and t1.str = '3' OR tmp.str = '1000'`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on issue #2271: Inner join incorrectly pushdown predicate with OR operation

Posted by GitBox <gi...@apache.org>.
jackwener commented on issue #2271:
URL: https://github.com/apache/arrow-datafusion/issues/2271#issuecomment-1102600707

   😅 `optimizer` of datafusion is strange.....We should put the `convert outer join to inner/right/left join` in the `rule`.
   
   We can infer `is not null` in the `on clause` or `where clause` and `is not null` can make `outer join` convert.
   
   Current:
   - `convert join` with `on clause` is in `parse_join() in `plan_from_tables()`
   - `convert join` with `where clause` is in `plan_selection()`
   
   This bug is because `parse_join()` use the cols from `extract_join_keys` directly. It's wrong.
   
   I have explained them carefully in https://github.com/apache/incubator-doris/pull/8910#issuecomment-1096159553
   
   Easy way is just consider `AND`. 
   
   Hope those information is helpful to person who want to fix this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb closed issue #2271: Inner join incorrectly pushdown predicate with OR operation

Posted by GitBox <gi...@apache.org>.
alamb closed issue #2271: Inner join incorrectly pushdown predicate with OR operation
URL: https://github.com/apache/arrow-datafusion/issues/2271


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org