You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/04 07:44:00 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

jorgecarleitao opened a new pull request #8748:
URL: https://github.com/apache/arrow/pull/8748


   This is a draft proposal to address a limitation on which the join currently computes the left side (build) on every part of the right (probe).
   
   @alamb and @andygrove , I am not sure if this is right from the execution's point of view. the idea here is to leverage the notion that as long as it has all the information it requires, a node can always return all its streams simply by iterating on them. We can scrap this if it does not feel right.
   
   Note that this makes `Merge` to not `Plan::execute` inside of a `spawn`. I am not sure if this has non trivial consequences.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8748:
URL: https://github.com/apache/arrow/pull/8748


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-747387849


   Closing this as it is not the right approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#discussion_r541908029



##########
File path: rust/datafusion/src/physical_plan/mod.rs
##########
@@ -80,8 +80,9 @@ pub trait ExecutionPlan: Debug + Send + Sync {
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
-    /// creates an iterator
-    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
+    /// creates a vector of streams of [RecordBatch]es, each stream corresponding
+    /// to a part of the partition.
+    async fn execute(&self) -> Result<Vec<SendableRecordBatchStream>>;

Review comment:
       I am sorry I have just started looking at this PR more carefully -- it seems like the proposal is to change execute so that it returns a `Vec` of streams rather than a single stream. I think this is reasonable as it allows physical plan implementations more flexibility when reading the inputs (they can always merge the streams into one if they choose too)

##########
File path: rust/datafusion/src/physical_plan/hash_join.rs
##########
@@ -143,13 +143,10 @@ impl ExecutionPlan for HashJoinExec {
         self.right.output_partitioning()
     }
 
-    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+    async fn execute(&self) -> Result<Vec<SendableRecordBatchStream>> {

Review comment:
       Ah, I had missed this detail on the earlier versions of this PR -- that each invocation of execute for the probe side needed to call execute on the build side.
   
   An alternate implementation would be to store the hash table on `self` (in a Arc<Mutex<..>>) and reuse it on subsequent invocations.
   
   I like the pattern of this PR where each `execute` method is invoked once and then the async streams are used to coordinate the threading / execution. 
   
   👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8748:
URL: https://github.com/apache/arrow/pull/8748


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744065261


   I am also seeing a slowdown.
   Master:
   ```
   Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
   Logical plan:
   Sort: #l_shipmode ASC NULLS FIRST
     Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
       Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
         Join: l_orderkey = o_orderkey
           TableScan: lineitem projection=None
           TableScan: orders projection=None
   Optimized logical plan:
   Sort: #l_shipmode ASC NULLS FIRST
     Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
       Join: l_orderkey = o_orderkey
         Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
           TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
         TableScan: orders projection=Some([0, 5])
   +------------+-----------------+----------------+
   | l_shipmode | high_line_count | low_line_count |
   +------------+-----------------+----------------+
   | MAIL       | 6202            | 9324           |
   | SHIP       | 6200            | 9262           |
   +------------+-----------------+----------------+
   Query 12 iteration 0 took 6708 ms
   ```
   
   This PR
   
   ```
   Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
   Logical plan:
   Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
     Join: l_orderkey = o_orderkey
       Filter: #l_receiptdate Lt Utf8("1995-01-01")
         Filter: #l_receiptdate GtEq Utf8("1994-01-01")
           Filter: #l_shipdate Lt #l_commitdate
             Filter: #l_commitdate Lt #l_receiptdate
               Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP")
                 TableScan: lineitem projection=None
       TableScan: orders projection=None
   Optimized logical plan:
   Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
     Join: l_orderkey = o_orderkey
       TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
       TableScan: orders projection=Some([0])
   a774ae7f3b83dd3127cf40709f9ac9d5c8d98e25+------------+-----------------+----------------+
   | l_shipmode | high_line_count | low_line_count |
   +------------+-----------------+----------------+
   | MAIL       | 857399          | 0              |
   | TRUCK      | 856997          | 0              |
   | SHIP       | 858036          | 0              |
   | REG AIR    | 856867          | 0              |
   | AIR        | 858103          | 0              |
   | FOB        | 857323          | 0              |
   | RAIL       | 856484          | 0              |
   +------------+-----------------+----------------+
   Query 12 iteration 0 took 277062 ms
   ```
   
   Note that the results are incorrect as well. It looks the filters are not being applied correctly.
   
   Edit: The filters are being dropped from the optimized plan.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan edited a comment on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
Dandandan edited a comment on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744040750


   @jorgecarleitao 
   I checked out the branch locally and ran it, the implementation unfortunately causes a ~40x slowdown on a query that includes a join (I double checked that I was running in release mode):
   
   Query 12 iteration 0 took 51340 ms
   Query 12 iteration 1 took 53788 ms
   Query 12 iteration 2 took 53361 ms
   
   (Time on master is ~1200-1300ms)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-741841804


   Thanks @jorgecarleitao I will review this PR in the next day or two.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-732331435


   https://issues.apache.org/jira/browse/ARROW-10703


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744222703


   Well, clearly this does not work :)
   
   I do not understand why, but this is not ready to merge.
   
   FWIW, IMO this issue (not the PR) is a priority for the 3.0.0 release.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744040750


   @jorgecarleitao 
   I checked out the branch locally and ran it, the implementation unfortunately causes a ~40x slowdown on a query that includes a join (I double checked that I was running in release mode):
   
   Query 12 iteration 0 took 51340 ms
   Query 12 iteration 1 took 53788 ms
   Query 12 iteration 2 took 53361 ms
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-744065261


   I am also seeing a slowdown.
   Master:
   ```
   Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
   Logical plan:
   Sort: #l_shipmode ASC NULLS FIRST
     Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
       Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
         Join: l_orderkey = o_orderkey
           TableScan: lineitem projection=None
           TableScan: orders projection=None
   Optimized logical plan:
   Sort: #l_shipmode ASC NULLS FIRST
     Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(CASE WHEN #o_orderpriority Eq Utf8("1-URGENT") Or #o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS high_line_count, SUM(CASE WHEN #o_orderpriority NotEq Utf8("1-URGENT") And #o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END) AS low_line_count]]
       Join: l_orderkey = o_orderkey
         Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP") And #l_commitdate Lt #l_receiptdate And #l_shipdate Lt #l_commitdate And #l_receiptdate GtEq Utf8("1994-01-01") And #l_receiptdate Lt Utf8("1995-01-01")
           TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
         TableScan: orders projection=Some([0, 5])
   +------------+-----------------+----------------+
   | l_shipmode | high_line_count | low_line_count |
   +------------+-----------------+----------------+
   | MAIL       | 6202            | 9324           |
   | SHIP       | 6200            | 9262           |
   +------------+-----------------+----------------+
   Query 12 iteration 0 took 6708 ms
   ```
   
   This PR
   
   ```
   Running benchmarks with the following options: BenchmarkOpt { query: 12, debug: true, iterations: 1, concurrency: 12, batch_size: 4096, path: "/mnt/tpch/tbl-sf1/", file_format: "tbl", mem_table: false }
   Logical plan:
   Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
     Join: l_orderkey = o_orderkey
       Filter: #l_receiptdate Lt Utf8("1995-01-01")
         Filter: #l_receiptdate GtEq Utf8("1994-01-01")
           Filter: #l_shipdate Lt #l_commitdate
             Filter: #l_commitdate Lt #l_receiptdate
               Filter: #l_shipmode Eq Utf8("MAIL") Or #l_shipmode Eq Utf8("SHIP")
                 TableScan: lineitem projection=None
       TableScan: orders projection=None
   Optimized logical plan:
   Aggregate: groupBy=[[#l_shipmode]], aggr=[[SUM(Int32(1)) AS high_line_count, SUM(Int32(0)) AS low_line_count]]
     Join: l_orderkey = o_orderkey
       TableScan: lineitem projection=Some([0, 10, 11, 12, 14])
       TableScan: orders projection=Some([0])
   a774ae7f3b83dd3127cf40709f9ac9d5c8d98e25+------------+-----------------+----------------+
   | l_shipmode | high_line_count | low_line_count |
   +------------+-----------------+----------------+
   | MAIL       | 857399          | 0              |
   | TRUCK      | 856997          | 0              |
   | SHIP       | 858036          | 0              |
   | REG AIR    | 856867          | 0              |
   | AIR        | 858103          | 0              |
   | FOB        | 857323          | 0              |
   | RAIL       | 856484          | 0              |
   +------------+-----------------+----------------+
   Query 12 iteration 0 took 277062 ms
   ```
   
   Note that the results are incorrect as well. It looks the filters are not being applied correctly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #8748: ARROW-10703: [Rust] [DataFusion] Make join not collect on every right part

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #8748:
URL: https://github.com/apache/arrow/pull/8748#issuecomment-740564846


   Would be nice if we can continue with this work. @alamb @andygrove any idea how this fits in the current design, any alternatives?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org