You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/26 13:32:14 UTC

[GitHub] [arrow-datafusion] alamb opened a new issue #187: [Datafusion] Support joins on TimestampMillisecond columns

alamb opened a new issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187


   *Note*: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-11940
   
   Joining DataFrames on a TimestampMillisecond column gives error:
   
   ```
   
   'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")
   
   arrow/rust/datafusion/src/physical_plan/hash_join.rs:252:30
   
   '
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan closed issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
Dandandan closed issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan closed issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
Dandandan closed issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
alamb commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926940611


   Thanks for the report @francis-du ! 
   
   This search can help you find the relevant code: https://github.com/apache/arrow-datafusion/search?q=Unsupported+data+type+in+hasher
   
   The two locations that create that error are :
   https://github.com/apache/arrow-datafusion/blob/65483d32f6ee86766bb74988659bb51142a4edff/datafusion/src/physical_plan/hash_join.rs#L789
   
   https://github.com/apache/arrow-datafusion/blob/65483d32f6ee86766bb74988659bb51142a4edff/datafusion/src/physical_plan/hash_utils.rs#L507
   
   To begin debugging I would suggest:
   1. Write a test in sql.rs or exec.rs showing the problem
   2. Figure out which of the two instances are being hit and then add the appropriate fix (I wonder if your data somehow has `Timestamp(Milliseconds, Some(..)` (aka some timezone information?)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] francis-du edited a comment on issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
francis-du edited a comment on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926895389


   Same issue to me:
   
   When the join condition is c3=c3, there will be no problem, but c1=c1 or c2=c2 will cause panic.
   
   Panic: 
   
   ```sql 
   > select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
   thread 'thread 'tokio-runtime-workertokio-runtime-worker' panicked at '' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', ', datafusion/src/physical_plan/hash_join.rsdatafusion/src/physical_plan/hash_join.rs::583583::1414
   
   thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', thread 'datafusion/src/physical_plan/hash_join.rstokio-runtime-worker:' panicked at '583called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher"):', 14datafusion/src/physical_plan/hash_join.rs
   :583:14
   ++
   ++
   ```
   
   Table info: 
   
   ```sql
   
   > show columns from public.simple;
   +---------------+--------------+------------+-------------+-----------+-------------+
   | table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
   +---------------+--------------+------------+-------------+-----------+-------------+
   | datafusion    | public       | simple     | c1          | Float64   | NO          |
   | datafusion    | public       | simple     | c2          | Float64   | NO          |
   | datafusion    | public       | simple     | c3          | Boolean   | NO          |
   +---------------+--------------+------------+-------------+-----------+-------------+
   ```
   
   Query plan:
   
   ```shell
   
   > explain  select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                                                                                                                                                         |
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: #a.c1 ASC NULLS FIRST                                                                                                                                                                                                                  |
   |               |   Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3, #d.c1, #d.c2, #d.c3                                                                                                                                                                  |
   |               |     Join: #a.c2 = #d.c2                                                                                                                                                                                                                      |
   |               |       Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3                                                                                                                                                                                   |
   |               |         Join: #a.c3 = #b.c3                                                                                                                                                                                                                  |
   |               |           Filter: #a.c1 > Int64(0)                                                                                                                                                                                                           |
   |               |             TableScan: a projection=Some([0, 1, 2])                                                                                                                                                                                          |
   |               |           Projection: #b.c1, #b.c2, #b.c3                                                                                                                                                                                                    |
   |               |             TableScan: b projection=Some([0, 1, 2])                                                                                                                                                                                          |
   |               |       Projection: #d.c1, #d.c2, #d.c3                                                                                                                                                                                                        |
   |               |         TableScan: d projection=Some([0, 1, 2])                                                                                                                                                                                              |
   | physical_plan | SortExec: [c1@0 ASC]                                                                                                                                                                                                                         |
   |               |   CoalescePartitionsExec                                                                                                                                                                                                                     |
   |               |     ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3, c1@6 as c1, c2@7 as c2, c3@8 as c3]                                                                                                        |
   |               |       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                            |
   |               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c2", index: 1 }, Column { name: "c2", index: 1 })]                                                                                                             |
   |               |           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                        |
   |               |             RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8)                                                                                                                                                         |
   |               |               ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3]                                                                                                                                  |
   |               |                 CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                  |
   |               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c3", index: 2 }, Column { name: "c3", index: 2 })]                                                                                                   |
   |               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                              |
   |               |                       RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8)                                                                                                                                               |
   |               |                         CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                          |
   |               |                           FilterExec: c1@0 > CAST(0 AS Float64)                                                                                                                                                                              |
   |               |                             RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                 |
   |               |                               CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
   |               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                              |
   |               |                       RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8)                                                                                                                                               |
   |               |                         ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]                                                                                                                                                            |
   |               |                           RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                   |
   |               |                             CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false   |
   |               |           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                        |
   |               |             RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8)                                                                                                                                                         |
   |               |               ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]                                                                                                                                                                      |
   |               |                 RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                             |
   |               |                   CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false             |
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   I want to participate in fixing this problem, can you provide some help. 😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] francis-du commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
francis-du commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926895389


   Same issue to me:
   
   When the join condition is c3=c3, there will be no problem, but c1=c1 or c2=c2 will cause panic.
   
   Panic: 
   
   ```shell 
   > select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
   thread 'thread 'tokio-runtime-workertokio-runtime-worker' panicked at '' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', ', datafusion/src/physical_plan/hash_join.rsdatafusion/src/physical_plan/hash_join.rs::583583::1414
   
   thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', thread 'datafusion/src/physical_plan/hash_join.rstokio-runtime-worker:' panicked at '583called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher"):', 14datafusion/src/physical_plan/hash_join.rs
   :583:14
   ++
   ++
   ```
   
   Table info: 
   
   ```shell
   
   > show columns from public.simple;
   +---------------+--------------+------------+-------------+-----------+-------------+
   | table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
   +---------------+--------------+------------+-------------+-----------+-------------+
   | datafusion    | public       | simple     | c1          | Float64   | NO          |
   | datafusion    | public       | simple     | c2          | Float64   | NO          |
   | datafusion    | public       | simple     | c3          | Boolean   | NO          |
   +---------------+--------------+------------+-------------+-----------+-------------+
   ```
   
   Query plan:
   
   ```shell
   
   > explain  select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                                                                                                                                                                                                         |
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: #a.c1 ASC NULLS FIRST                                                                                                                                                                                                                  |
   |               |   Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3, #d.c1, #d.c2, #d.c3                                                                                                                                                                  |
   |               |     Join: #a.c2 = #d.c2                                                                                                                                                                                                                      |
   |               |       Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3                                                                                                                                                                                   |
   |               |         Join: #a.c3 = #b.c3                                                                                                                                                                                                                  |
   |               |           Filter: #a.c1 > Int64(0)                                                                                                                                                                                                           |
   |               |             TableScan: a projection=Some([0, 1, 2])                                                                                                                                                                                          |
   |               |           Projection: #b.c1, #b.c2, #b.c3                                                                                                                                                                                                    |
   |               |             TableScan: b projection=Some([0, 1, 2])                                                                                                                                                                                          |
   |               |       Projection: #d.c1, #d.c2, #d.c3                                                                                                                                                                                                        |
   |               |         TableScan: d projection=Some([0, 1, 2])                                                                                                                                                                                              |
   | physical_plan | SortExec: [c1@0 ASC]                                                                                                                                                                                                                         |
   |               |   CoalescePartitionsExec                                                                                                                                                                                                                     |
   |               |     ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3, c1@6 as c1, c2@7 as c2, c3@8 as c3]                                                                                                        |
   |               |       CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                            |
   |               |         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c2", index: 1 }, Column { name: "c2", index: 1 })]                                                                                                             |
   |               |           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                        |
   |               |             RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8)                                                                                                                                                         |
   |               |               ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3]                                                                                                                                  |
   |               |                 CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                  |
   |               |                   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c3", index: 2 }, Column { name: "c3", index: 2 })]                                                                                                   |
   |               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                              |
   |               |                       RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8)                                                                                                                                               |
   |               |                         CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                          |
   |               |                           FilterExec: c1@0 > CAST(0 AS Float64)                                                                                                                                                                              |
   |               |                             RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                 |
   |               |                               CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
   |               |                     CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                              |
   |               |                       RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8)                                                                                                                                               |
   |               |                         ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]                                                                                                                                                            |
   |               |                           RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                   |
   |               |                             CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false   |
   |               |           CoalesceBatchesExec: target_batch_size=4096                                                                                                                                                                                        |
   |               |             RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8)                                                                                                                                                         |
   |               |               ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]                                                                                                                                                                      |
   |               |                 RepartitionExec: partitioning=RoundRobinBatch(8)                                                                                                                                                                             |
   |               |                   CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false             |
   +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   
   I want to participate in fixing this problem, can you provide some help. 😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] francis-du commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns

Posted by GitBox <gi...@apache.org>.
francis-du commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-927205291


   @alamb Thanks for your help, I try to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org