You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/04 07:44:00 UTC
[GitHub] [arrow] jorgecarleitao opened a new pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
jorgecarleitao opened a new pull request #8748:
URL: https://github.com/apache/arrow/pull/8748
This is a draft proposal to address a limitation on which the join currently computes the left side (build) on every part of the right (probe).
@alamb and @andygrove , I am not sure if this is right from the execution's point of view. the idea here is to leverage the notion that as long as it has all the information it requires, a node can always return all its streams simply by iterating on them. We can scrap this if it does not feel right.
Note that this makes `Merge` to not `Plan::execute` inside of a `spawn`. I am not sure if this has non trivial consequences.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] jorgecarleitao closed pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8748:
URL: https://github.com/apache/arrow/pull/8748
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-747387849
Closing this as it is not the right approach.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] alamb commented on a change in pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#discussion_r541908029
##########
File path: rust/datafusion/src/physical_plan/mod.rs
##########
@@ -80,8 +80,9 @@ pub trait ExecutionPlan: Debug + Send + Sync {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
- /// creates an iterator
- async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
+ /// creates a vector of streams of [RecordBatch]es, each stream corresponding
+ /// to a part of the partition.
+ async fn execute(&self) -> Result<Vec<SendableRecordBatchStream>>;
Review comment:
I am sorry I have just started looking at this PR more carefully -- it seems like the proposal is to change execute so that it returns a `Vec` of streams rather than a single stream. I think this is reasonable as it allows physical plan implementations more flexibility when reading the inputs (they can always merge the streams into one if they choose too)
##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -143,13 +143,10 @@ impl ExecutionPlan for HashJoinExec {
self.right.output_partitioning()
}
- async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+ async fn execute(&self) -> Result<Vec<SendableRecordBatchStream>> {
Review comment:
Ah, I had missed this detail on the earlier versions of this PR -- that each invocation of execute for the probe side needed to call execute on the build side.
An alternate implementation would be to store the hash table on `self` (in a Arc<Mutex<..>>) and reuse it on subsequent invocations.
I like the pattern of this PR where each `execute` method is invoked once and then the async streams are used to coordinate the threading / execution.
👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] jorgecarleitao closed pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8748:
URL: https://github.com/apache/arrow/pull/8748
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] andygrove edited a comment on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744065261
I am also seeing a slowdown.
Master:
```
Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
Logical plan:
Sort: #l_shipmode ASC NULLS FIRST
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
Join: l_orderkey = o_orderkey
TableScan: lineitem projection=None
TableScan: orders projection=None
Optimized logical plan:
Sort: #l_shipmode ASC NULLS FIRST
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
Join: l_orderkey = o_orderkey
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
TableScan: orders projection=Some([0, 5])
+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL | 6202 | 9324 |
| SHIP | 6200 | 9262 |
+------------+-----------------+----------------+
Query 12 iteration 0 took 6708 ms
```
This PR
```
Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
Logical plan:
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
Join: l_orderkey = o_orderkey
Filter: #l_receiptdate Lt Utf8("1995-01-01")
Filter: #l_receiptdate GtEq Utf8("1994-01-01")
Filter: #l_shipdate Lt #l_commitdate
Filter: #l_commitdate Lt #l_receiptdate
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP")
TableScan: lineitem projection=None
TableScan: orders projection=None
Optimized logical plan:
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
Join: l_orderkey = o_orderkey
TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
TableScan: orders projection=Some([0])
a774ae7f3b83dd3127cf40709f9ac9d5c8d98e25+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL | 857399 | 0 |
| TRUCK | 856997 | 0 |
| SHIP | 858036 | 0 |
| REG AIR | 856867 | 0 |
| AIR | 858103 | 0 |
| FOB | 857323 | 0 |
| RAIL | 856484 | 0 |
+------------+-----------------+----------------+
Query 12 iteration 0 took 277062 ms
```
Note that the results are incorrect as well. It looks the filters are not being applied correctly.
Edit: The filters are being dropped from the optimized plan.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] Dandandan edited a comment on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744040750
@jorgecarleitao
I checked out the branch locally and ran it, the implementation unfortunately causes a ~40x slowdown on a query that includes a join (I double checked that I was running in release mode):
Query 12 iteration 0 took 51340 ms
Query 12 iteration 1 took 53788 ms
Query 12 iteration 2 took 53361 ms
(Time on master is ~1200-1300ms)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] andygrove commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-741841804
Thanks @jorgecarleitao I will review this PR in the next day or two.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] github-actions[bot] commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-732331435
https://issues.apache.org/jira/browse/ARROW-10703
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] jorgecarleitao commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744222703
Well, clearly this does not work :)
I do not understand why, but this is not ready to merge.
FWIW, IMO this issue (not the PR) is a priority for the 3.0.0 release.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] Dandandan commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744040750
@jorgecarleitao
I checked out the branch locally and ran it, the implementation unfortunately causes a ~40x slowdown on a query that includes a join (I double checked that I was running in release mode):
Query 12 iteration 0 took 51340 ms
Query 12 iteration 1 took 53788 ms
Query 12 iteration 2 took 53361 ms
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] andygrove commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744065261
I am also seeing a slowdown.
Master:
```
Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
Logical plan:
Sort: #l_shipmode ASC NULLS FIRST
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
Join: l_orderkey = o_orderkey
TableScan: lineitem projection=None
TableScan: orders projection=None
Optimized logical plan:
Sort: #l_shipmode ASC NULLS FIRST
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
Join: l_orderkey = o_orderkey
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
TableScan: orders projection=Some([0, 5])
+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL | 6202 | 9324 |
| SHIP | 6200 | 9262 |
+------------+-----------------+----------------+
Query 12 iteration 0 took 6708 ms
```
This PR
```
Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
Logical plan:
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
Join: l_orderkey = o_orderkey
Filter: #l_receiptdate Lt Utf8("1995-01-01")
Filter: #l_receiptdate GtEq Utf8("1994-01-01")
Filter: #l_shipdate Lt #l_commitdate
Filter: #l_commitdate Lt #l_receiptdate
Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP")
TableScan: lineitem projection=None
TableScan: orders projection=None
Optimized logical plan:
Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
Join: l_orderkey = o_orderkey
TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
TableScan: orders projection=Some([0])
a774ae7f3b83dd3127cf40709f9ac9d5c8d98e25+------------+-----------------+----------------+
| l_shipmode | high_line_count | low_line_count |
+------------+-----------------+----------------+
| MAIL | 857399 | 0 |
| TRUCK | 856997 | 0 |
| SHIP | 858036 | 0 |
| REG AIR | 856867 | 0 |
| AIR | 858103 | 0 |
| FOB | 857323 | 0 |
| RAIL | 856484 | 0 |
+------------+-----------------+----------------+
Query 12 iteration 0 took 277062 ms
```
Note that the results are incorrect as well. It looks the filters are not being applied correctly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [arrow] Dandandan commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part
Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-740564846
Would be nice if we can continue with this work. @alamb @andygrove any idea how this fits in the current design, any alternatives?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org