You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/22 05:15:08 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

jorgecarleitao opened a new pull request #8503:
URL: https://github.com/apache/arrow/pull/8503


   This makes `merge` send batches to a receiver stream as they arrive and, thereby removing the need to wait for each thread to finish collecting all its batches.
   
   on micro-benchmarks, performance on non-grouped data improved by -8%, on other aggregate benches is between +2%-0%.
   
   This is the implementation of an idea fielded by @alamb here: https://github.com/apache/arrow/pull/8473/files#r506304982
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't work because we need to `join_all` threads. because there is no consumer to retrieve the items from the `receiver`, we are locked as the threads cannot be joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose threads. If we join all, the channel needs to be unbounded so that we can build the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all threads to finish before returning the stream, which I understand is not what we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510216029



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Now that I think about it, I don't think calling `join_all` before returning `MergeStream` is needed here (as that  requires all tasks to complete executing before anything can consume the output of this operator).
   
   If we want to pass errors back up stream (aka call join_all eventually) I wonder if we can put `handles` into `MergeStream` and call `join_all` after `input` is done. 
   
   I am thinking all of this theoretically -- not having actually ever expressed such a thing in Rust / futures, BTW. So thanks for bearing with me!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r512384366



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I it that this is by design: there is no guarantee that a task spawn by Tokio finishes unless we `.join()` it. A `tokio::spawn` is a future like any other future: we need to await for it.
   
   At least the problem is well defined: we have a vector (over `parts`) of futures (`spawn`) whose result is a stream (of `record batches`) and we would like to convert this into a stream of record batches. 
   
   The solution IMO is to have a new adapter for this:
   
   ```rust
   type Streams = Vec<Item=Future<Stream<Item=Result<RecordBatch>>>>;
   
   struct Adapter {}
   
   impl Adapter {
       pub fn new(it: Streams);
   }
   
   impl Stream for Adapter {
       item = Result<RecordBatch>
       
       fn poll_next(
           mut self: Pin<&mut Self>,
           cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
            // poll any of the tasks, if it is done, store the stream in `self` and start pulling 
            // from the stream together with the other tasks
       }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r514788126



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Super! I will close this one in favor of yours.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Good point.
   
   I actually started with a `bounded(1)`, but this (and others) won't work because we need to `join_all` threads. because there is no consumer to retrieve the items from the `receiver`, we are locked as the threads cannot be joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose threads. If we join all, the channel needs to be unbounded so that we can build the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all threads to finish before returning the stream, which I understand is not what we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513696355



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       you are in the right track IMO. Good thinking going for the Hash first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510253390



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I'll try and give it look locally this afternoon. I am trying to finish up something for work now that I need to focus on first 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510252959



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       What experiment failed? removing the calls to `join_all`?
   
   > is that we want to ensure the the threads finish
   
   I wonder why we care about the thread "finishing" on some particular schedule. It seems what matter
   
   The situation described in https://stackoverflow.com/q/38957741/931303 I think is about expecting to see a side-effect of thread execution that happens *after* all the results are retrieved from them. 
   
   So in this case, the threads will all hang around, sending results via the channel as long as they have something to send and the channel hasn't been closed.
   
   Once the channel is closed, there is some period of time during which the threads might be running (effectively after completing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#issuecomment-714272285


   I am not sure if this is what we want: we still need to join all the threads on `execute` or we may lose then, which means that the stream will accumulate all the batches instead of streaming them as they arrive by the threads.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510252959



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       What experiment failed? removing the calls to `join_all`?
   
   > is that we want to ensure the the threads finish
   
   I wonder why we care about the thread "finishing" on some particular schedule. It seems what matter
   
   The situation described in https://stackoverflow.com/q/38957741/931303 I think is about expecting to see a side-effect of thread execution that happens *after* all the results are retrieved from them. 
   
   So in this case, the threads will all hang around, sending results via the channel as long as they have something to send and the channel hasn't been closed.
   
   Once the channel is closed, there is some period of time during which the threads might be running (effectively after completing, but I don't know what adverse effect that might have




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513681082



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       It is fairly astonishing that Rust has made streaming so hard. 
   
   However, I got it to compile and mostly work. Here is a commit (changes to this branch) with the approach that passes most of the the arrow and datafusion tests and also doesn't require waiting for all the input threads to complete:  https://github.com/apache/arrow/commit/28075970d8d59fa87359ad38470669485f029578
   
   It still does not quite work in all cases (I need to port the normal aggregate over) but I'll finish that up.
   
   I am curious if you have any thoughts about the approach. I'll try and write it up coherently tomorrow morning, now I need to go take care of some other things.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513652359



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I have spent entirely too long on this today -- lol sorry it is taking so long.
   
   I am now looking into the angle that this is related to how HashAggregate is implemented (specifically, it creates a stream and then calls `poll_next_unpin` on it: https://github.com/apache/arrow/blob/master/rust/datafusion/src/physical_plan/hash_aggregate.rs#L399). If the input is not all available immediately, I believe this stream is simply dropped and nothing ever drives the futures forward again.
   
   However, I realize I had a different idea the other day so this one might also turn out to be a dead end too.
   
   I have been trying (locally) to rewrite hash aggregate so it uses a future that was not created during the actual call to `poll_next`. However, I am currently stymied on the need to try and get `pin-ing` correct -- my branch refuses to compile. I'll keep posting updates....
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#issuecomment-714238006


   https://issues.apache.org/jira/browse/ARROW-10366


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510227802



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I thought that too!
   
   until this morning, where I opened this PR and many tests failed. :/
   
   The issue is not that we want the errors, is that we want to ensure the the threads finish. If we remove the `join_all`, we end up in the situation described here: https://stackoverflow.com/q/38957741/931303 (I think, thanks for bearing with me also ^_^)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r512668107



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I am fairly convinced that the execution is not happening `async`hronously -- in particular, I think there is something about this code (or perhaps how some of the other collection functions) that means all data must be available *before* the stream future itself is returned. 
   
   When I was playing with it, if `poll_next` ever returns `Pending` then `poll_next` is never called again. I think the adapter you outline above is a good idea; I think it also needs to arrange for `cx.waker()` to be woken if anything is added to the underlying stream.
   
   I have a bunch of ideas in my head, but I have not had time to chase them down yet, sadly :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r514557277



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       https://github.com/apache/arrow/pull/8553




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513654504



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       At this point I am ashamed of the immoral amount of time I spent on this 😭
   
   I am getting convinced that I should have kept the other PR with the iterator. IMO that was easy to reason about (and there was already a 50% performance improvement xD




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513697726



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Here is the a commit that passes all the tests: https://github.com/alamb/arrow/commit/01b4d8758192b40494447401f8cb5bd970b38835 (again, based on this branch). 
   
   What performance benchmarks were you running? I can see if this change makes any difference




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#issuecomment-714243113


   This is not working with aggregates and I am trying to understand why, thus moved it to draft.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510851478



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       FWIW I am looking at this and it is definitely not doing what I expected.I spent a while adding instrumentation (`println!`) and observing what was happening.
   
   From what I can tell, problem is that nothing ever arranges for poll_next to get invoked again -- so if the data isn't available on the first invocation, then no results are ever produced.
   
   I tried several different things to get this working (most notably invoking `Waker::wake` but I couldn't get the runtime to call `poll_next` again. 
   
   My current (un proven theory) is that it has something to do with the interaction of `futures::Stream` and `tokio::stream::Stream` - https://docs.rs/tokio/0.2.22/tokio/stream/trait.Stream.html which i think are similar but not quite the same. 
   
   And the mixing of `futures` and `tokio` may be a problem... Maybe




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513715706



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       cool - thanks -- will give it a try tomorrow




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510061337



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I wonder if picking a fixed channel size might be a good idea. Specifically, given there is s one  That way the producers can't get too far ahead. I think a small fixed number (`number_of_cores`? 10?) might be reasonable to begin with
   
   Given that the actual work of `MergeStream` is pretty light (simply passing the record batches on) this may not be a problem in practice, but it probably depends on what order the tasks are run.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't work because we need to `join_all` threads. because there is no consumer to retrieve the items from the `receiver`, we are locked as the threads cannot be joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose threads. If we join all, the channel needs to be unbounded so that we can build the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all threads to finish before returning the stream, which I understand is not what we want.
   
   (I was hoping that some of you would know the solution for this 😄)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r514557277



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I created a real PR: https://github.com/apache/arrow/pull/8553




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8503:
URL: https://github.com/apache/arrow/pull/8503


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510567189



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Thanks @alamb. So, [this](https://github.com/jorgecarleitao/arrow/pull/18) fails with no partitions collected at the end.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r509907817



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,53 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                // todo: buffer size should be configurable or dependent of metrics
+                let (sender, receiver) = mpsc::channel::<ArrowResult<RecordBatch>>(1);
+
+                // spawn independent tasks whose resulting streams (of batches)
+                // are sent to the channel for consumption.
+                (0..input_partitions).for_each(|part_i| {
                     let input = self.input.clone();
+                    let mut sender = sender.clone();
                     tokio::spawn(async move {

Review comment:
       we need to join the handles of this one or the main thread may finish before the spawn task does.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r510134213



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Good point.
   
   I actually started with a `bounded(1)`, but this (and other values) won't work because we need to `join_all` threads. because there is no consumer to retrieve the items from the `receiver`, we are locked as the threads cannot be joined, and we wait indefinitely for the `join_all`.
   
   An alternative is to not `join_all`, but then we risk losing results if the main thread finishes first.
   
   I.e. if we bound the channel, we cannot `join_all` and thus we may lose threads. If we join all, the channel needs to be unbounded so that we can build the stream.
   
   IMO neither is good, as in both cases we are essentially waiting for all threads to finish before returning the stream, which I understand is not what we want.
   
   <small> I was hoping that some of you would know the solution for this </small>




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#issuecomment-715899914


   I spent some more time looking and thinking about this issue. I have some more thoughts I plan to try out tomorrow. I'll keep you posed @jorgecarleitao 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r512384366



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       I it that this is by design: there is no guarantee that a task spawn by Tokio finishes unless we `.join()` it. A `tokio::spawn` is a future like any other future: we need to await for it.
   
   At least the problem is well defined: we have a vector (over `parts`) of futures (`spawn`) whose result is a stream (of `record batches`) and we would like to convert this into a stream of record batches. 
   
   The solution IMO is to have a new adapter for this:
   
   ```rust
   // vec: one entry per part of the partition
   // future: the `tokio::spawn`
   // stream: the stream of records
   type Streams = Vec<Item=Future<Stream<Item=Result<RecordBatch>>>>;
   
   struct Adapter {}
   
   impl Adapter {
       pub fn new(it: Streams);
   }
   
   impl Stream for Adapter {
       item = Result<RecordBatch>
       
       fn poll_next(
           mut self: Pin<&mut Self>,
           cx: &mut Context<'_>,
       ) -> Poll<Option<Self::Item>> {
            // poll any of the tasks, if it is done, store the stream in `self` and start pulling 
            // from the stream together with the other tasks
       }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8503: ARROW-10366: [Rust] [DataFusion] Remove collect from within threads on merge

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8503:
URL: https://github.com/apache/arrow/pull/8503#discussion_r513699488



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -103,37 +105,56 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions).map(|part_i| {
+                let (sender, receiver) = mpsc::unbounded::<ArrowResult<RecordBatch>>();

Review comment:
       Cool! 
   
   This is what I use:
   
   ```bash
   git checkout master && \
       cargo bench --bench aggregate_query_sql && \
       git checkout merge && \
       cargo bench --bench aggregate_query_sql
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org