You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/26 13:32:14 UTC
[GitHub] [arrow-datafusion] alamb opened a new issue #187: [Datafusion] Support joins on TimestampMillisecond columns
alamb opened a new issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187
*Note*: migrated from original JIRA: https://issues.apache.org/jira/browse/ARROW-11940
Joining DataFrames on a TimestampMillisecond column gives error:
```
'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")
arrow/rust/datafusion/src/physical_plan/hash_join.rs:252:30
'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] Dandandan closed issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
Dandandan closed issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] Dandandan closed issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
Dandandan closed issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] alamb commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
alamb commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926940611
Thanks for the report @francis-du !
This search can help you find the relevant code: https://github.com/apache/arrow-datafusion/search?q=Unsupported+data+type+in+hasher
The two locations that create that error are :
https://github.com/apache/arrow-datafusion/blob/65483d32f6ee86766bb74988659bb51142a4edff/datafusion/src/physical_plan/hash_join.rs#L789
https://github.com/apache/arrow-datafusion/blob/65483d32f6ee86766bb74988659bb51142a4edff/datafusion/src/physical_plan/hash_utils.rs#L507
To begin debugging I would suggest:
1. Write a test in sql.rs or exec.rs showing the problem
2. Figure out which of the two instances are being hit and then add the appropriate fix (I wonder if your data somehow has `Timestamp(Milliseconds, Some(..)` (aka some timezone information?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] francis-du edited a comment on issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
francis-du edited a comment on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926895389
Same issue to me:
When the join condition is c3=c3, there will be no problem, but c1=c1 or c2=c2 will cause panic.
Panic:
```sql
> select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
thread 'thread 'tokio-runtime-workertokio-runtime-worker' panicked at '' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', ', datafusion/src/physical_plan/hash_join.rsdatafusion/src/physical_plan/hash_join.rs::583583::1414
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', thread 'datafusion/src/physical_plan/hash_join.rstokio-runtime-worker:' panicked at '583called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher"):', 14datafusion/src/physical_plan/hash_join.rs
:583:14
++
++
```
Table info:
```sql
> show columns from public.simple;
+---------------+--------------+------------+-------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+-------------+-----------+-------------+
| datafusion | public | simple | c1 | Float64 | NO |
| datafusion | public | simple | c2 | Float64 | NO |
| datafusion | public | simple | c3 | Boolean | NO |
+---------------+--------------+------------+-------------+-----------+-------------+
```
Query plan:
```shell
> explain select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: #a.c1 ASC NULLS FIRST |
| | Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3, #d.c1, #d.c2, #d.c3 |
| | Join: #a.c2 = #d.c2 |
| | Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3 |
| | Join: #a.c3 = #b.c3 |
| | Filter: #a.c1 > Int64(0) |
| | TableScan: a projection=Some([0, 1, 2]) |
| | Projection: #b.c1, #b.c2, #b.c3 |
| | TableScan: b projection=Some([0, 1, 2]) |
| | Projection: #d.c1, #d.c2, #d.c3 |
| | TableScan: d projection=Some([0, 1, 2]) |
| physical_plan | SortExec: [c1@0 ASC] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3, c1@6 as c1, c2@7 as c2, c3@8 as c3] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c2", index: 1 }, Column { name: "c2", index: 1 })] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c3", index: 2 }, Column { name: "c3", index: 2 })] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8) |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: c1@0 > CAST(0 AS Float64) |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
I want to participate in fixing this problem, can you provide some help. 😊
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] francis-du commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
francis-du commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-926895389
Same issue to me:
When the join condition is c3=c3, there will be no problem, but c1=c1 or c2=c2 will cause panic.
Panic:
```shell
> select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
thread 'thread 'tokio-runtime-workertokio-runtime-worker' panicked at '' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', ', datafusion/src/physical_plan/hash_join.rsdatafusion/src/physical_plan/hash_join.rs::583583::1414
thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher")', thread 'datafusion/src/physical_plan/hash_join.rstokio-runtime-worker:' panicked at '583called `Result::unwrap()` on an `Err` value: Internal("Unsupported data type in hasher"):', 14datafusion/src/physical_plan/hash_join.rs
:583:14
++
++
```
Table info:
```shell
> show columns from public.simple;
+---------------+--------------+------------+-------------+-----------+-------------+
| table_catalog | table_schema | table_name | column_name | data_type | is_nullable |
+---------------+--------------+------------+-------------+-----------+-------------+
| datafusion | public | simple | c1 | Float64 | NO |
| datafusion | public | simple | c2 | Float64 | NO |
| datafusion | public | simple | c3 | Boolean | NO |
+---------------+--------------+------------+-------------+-----------+-------------+
```
Query plan:
```shell
> explain select * from (select * from public.simple as a join (select * from public.simple as b) on a.c3=b.c3 where a.c1>0) join (select * from public.simple as d) on a.c2=d.c2 order by a.c1;
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: #a.c1 ASC NULLS FIRST |
| | Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3, #d.c1, #d.c2, #d.c3 |
| | Join: #a.c2 = #d.c2 |
| | Projection: #a.c1, #a.c2, #a.c3, #b.c1, #b.c2, #b.c3 |
| | Join: #a.c3 = #b.c3 |
| | Filter: #a.c1 > Int64(0) |
| | TableScan: a projection=Some([0, 1, 2]) |
| | Projection: #b.c1, #b.c2, #b.c3 |
| | TableScan: b projection=Some([0, 1, 2]) |
| | Projection: #d.c1, #d.c2, #d.c3 |
| | TableScan: d projection=Some([0, 1, 2]) |
| physical_plan | SortExec: [c1@0 ASC] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3, c1@6 as c1, c2@7 as c2, c3@8 as c3] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c2", index: 1 }, Column { name: "c2", index: 1 })] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3, c1@3 as c1, c2@4 as c2, c3@5 as c3] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "c3", index: 2 }, Column { name: "c3", index: 2 })] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8) |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: c1@0 > CAST(0 AS Float64) |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c3", index: 2 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | RepartitionExec: partitioning=Hash([Column { name: "c2", index: 1 }], 8) |
| | ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3] |
| | RepartitionExec: partitioning=RoundRobinBatch(8) |
| | CsvExec: source=Path(/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv: [/Users/francisdu/Code/Rust/arrow-datafusion/datafusion/tests/aggregate_simple.csv]), has_header=false |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
I want to participate in fixing this problem, can you provide some help. 😊
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow-datafusion] francis-du commented on issue #187: [Datafusion] Support joins on TimestampMillisecond columns
Posted by GitBox <gi...@apache.org>.
francis-du commented on issue #187:
URL: https://github.com/apache/arrow-datafusion/issues/187#issuecomment-927205291
@alamb Thanks for your help, I try to fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org