You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/15 18:56:48 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

jorgecarleitao opened a new pull request #8473:
URL: https://github.com/apache/arrow/pull/8473


   Recently, we introduced `async` to `execute`. This allowed us to parallelize multiple partitions as we denote an execution of a part (of a partition) as the unit of work. However, a part is often a large task composing multiple batches and steps.
   
   This PR makes all our execution nodes return a [futures::Stream](https://docs.rs/futures/0.3.6/futures/stream/trait.Stream.html) instead of an Iterator. For reference, a Stream is an iterator of futures, which in this case is a future of a `Result<RecordBatch>`.
   
   This effectively breaks the execution in smaller units of work (on which an individual unit is an operation returns a `Result<RecordBatch>`) allowing each task to chew smaller bits.
   
   This adds `futures` as a direct dependency of DataFusion (it was only a dev-dependency).
   
   This leads to a +10% degradation in aggregates in micro benchmarking, which IMO is expected given that there is more context switching to handle. However, I expect (hope?) this to be independent of the number of batches and partitions, and be offset by any async work we perform to our sources (readers) and sinks (writers).
   
   I did not take the time to optimize - the primary goal was to implement the idea, have it compile and pass tests, and have some discussion about it. I expect that we should be able to replace some of our operations by `join_all`, thereby scheduling multiple tasks at once (instead of waiting one by one).
   
   <details>
    <summary>Benchmarks</summary>
   
   Aggregates:
   ```
   aggregate_query_no_group_by 15 12                                                                            
                           time:   [908.71 us 961.16 us 1.0193 ms]
                           change: [+5.9644% +10.567% +15.382%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 7 outliers among 100 measurements (7.00%)
     2 (2.00%) high mild
     5 (5.00%) high severe
   
   aggregate_query_group_by 15 12                                                                            
                           time:   [6.6902 ms 7.0747 ms 7.5420 ms]
                           change: [+4.5521% +10.510% +18.352%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 8 outliers among 100 measurements (8.00%)
     2 (2.00%) high mild
     6 (6.00%) high severe
   
   aggregate_query_group_by_with_filter 15 12                                                                             
                           time:   [2.8901 ms 2.9207 ms 2.9531 ms]
                           change: [-16.357% -8.7619% -2.2536%] (p = 0.01 < 0.05)
                           Performance has improved.
   Found 3 outliers among 100 measurements (3.00%)
     3 (3.00%) high mild
   ```
   
   Math:
   ```
   sqrt_20_9               time:   [6.9844 ms 7.0582 ms 7.1363 ms]                      
                           change: [+0.0557% +1.5625% +3.0408%] (p = 0.05 < 0.05)
                           Change within noise threshold.
   Found 3 outliers among 100 measurements (3.00%)
     2 (2.00%) high mild
     1 (1.00%) high severe
   
   sqrt_20_12              time:   [2.8350 ms 2.9504 ms 3.1204 ms]                        
                           change: [+3.8751% +8.2857% +14.671%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 5 outliers among 100 measurements (5.00%)
     2 (2.00%) high mild
     3 (3.00%) high severe
   
   sqrt_22_12              time:   [14.888 ms 15.242 ms 15.620 ms]                       
                           change: [+7.6388% +10.709% +14.098%] (p = 0.00 < 0.05)
                           Performance has regressed.
   Found 5 outliers among 100 measurements (5.00%)
     3 (3.00%) high mild
     2 (2.00%) high severe
   
   sqrt_22_14              time:   [23.710 ms 23.817 ms 23.953 ms]                       
                           change: [-4.3401% -3.1824% -2.0952%] (p = 0.00 < 0.05)
                           Performance has improved.
   Found 11 outliers among 100 measurements (11.00%)
     5 (5.00%) high mild
     6 (6.00%) high severe
   ```
   </details>
   
   I admit that this is a bit outside my comfort zone, and someone with more experience in `async/await` could be of help.
   
   IMO this would integrate very nicely with ARROW-10307, ARROW-9275, I _think_ it would also help ARROW-9707, and I _think_ that it also opens the possibility consuming / producing batches from/to sources and sinks from flight.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r506288365



##########
File path: rust/datafusion/src/datasource/memory.rs
##########
@@ -135,6 +134,7 @@ mod tests {
     use super::*;
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
+    use futures::StreamExt;

Review comment:
       If we are worried about a new dependency on `futures` we maybe could upgrade the version of tokio instead and  use the `StreamExt` from `tokio` instead from `futures`: https://docs.rs/tokio/0.3.0/tokio/stream/trait.StreamExt.html
   
   
   
   

##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -31,53 +32,58 @@ use array::{
 };
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::Result as ArrowResult;
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::record_batch::RecordBatch;
 use arrow::{
     array::{self, ArrayRef},
     datatypes::Schema,
 };
+use futures::{Stream, TryStreamExt};
 
-/// Iterator over a vector of record batches
-pub struct RecordBatchIterator {
+/// Stream of record batches
+pub struct SizedRecordBatchStream {

Review comment:
       `BufferedRecordBatchStream` might be more descriptive of what this struct does

##########
File path: rust/datafusion/src/physical_plan/filter.rs
##########
@@ -98,72 +102,76 @@ impl ExecutionPlan for FilterExec {
         }
     }
 
-    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchReader> {
-        Ok(Box::new(FilterExecIter {
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(FilterExecStream {
             schema: self.input.schema().clone(),
             predicate: self.predicate.clone(),
             input: self.input.execute(partition).await?,
         }))
     }
 }
 
-/// The FilterExec iterator wraps the input iterator and applies the predicate expression to
+/// The FilterExec streams wraps the input iterator and applies the predicate expression to
 /// determine which rows to include in its output batches
-struct FilterExecIter {
+struct FilterExecStream {
     /// Output schema, which is the same as the input schema for this operator
     schema: SchemaRef,
     /// The expression to filter on. This expression must evaluate to a boolean value.
     predicate: Arc<dyn PhysicalExpr>,
     /// The input partition to filter.
-    input: SendableRecordBatchReader,
+    input: SendableRecordBatchStream,
+}
+
+fn batch_filter(
+    batch: &RecordBatch,
+    predicate: &Arc<dyn PhysicalExpr>,
+) -> ArrowResult<RecordBatch> {
+    predicate
+        .evaluate(&batch)
+        .map_err(ExecutionError::into_arrow_external_error)
+        .and_then(|array| {
+            array
+                .as_any()
+                .downcast_ref::<BooleanArray>()
+                .ok_or(
+                    ExecutionError::InternalError(
+                        "Filter predicate evaluated to non-boolean value".to_string(),
+                    )
+                    .into_arrow_external_error(),
+                )
+                // apply predicate to each column
+                .and_then(|predicate| {
+                    batch
+                        .columns()
+                        .iter()
+                        .map(|column| filter(column.as_ref(), predicate))
+                        .collect::<ArrowResult<Vec<_>>>()
+                })
+        })
+        // build RecordBatch
+        .and_then(|columns| RecordBatch::try_new(batch.schema().clone(), columns))
 }
 
-impl Iterator for FilterExecIter {
+impl Stream for FilterExecStream {
     type Item = ArrowResult<RecordBatch>;
 
-    /// Get the next batch
-    fn next(&mut self) -> Option<ArrowResult<RecordBatch>> {
-        match self.input.next() {
-            Some(Ok(batch)) => {
-                // evaluate the filter predicate to get a boolean array indicating which rows
-                // to include in the output
-                Some(
-                    self.predicate
-                        .evaluate(&batch)
-                        .map_err(ExecutionError::into_arrow_external_error)
-                        .and_then(|array| {
-                            array
-                                .as_any()
-                                .downcast_ref::<BooleanArray>()
-                                .ok_or(
-                                    ExecutionError::InternalError(
-                                        "Filter predicate evaluated to non-boolean value"
-                                            .to_string(),
-                                    )
-                                    .into_arrow_external_error(),
-                                )
-                                // apply predicate to each column
-                                .and_then(|predicate| {
-                                    batch
-                                        .columns()
-                                        .iter()
-                                        .map(|column| filter(column.as_ref(), predicate))
-                                        .collect::<ArrowResult<Vec<_>>>()
-                                })
-                        })
-                        // build RecordBatch
-                        .and_then(|columns| {
-                            RecordBatch::try_new(batch.schema().clone(), columns)
-                        }),
-                )
-            }
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.input.poll_next_unpin(cx).map(|x| match x {
+            Some(Ok(batch)) => Some(batch_filter(&batch, &self.predicate)),

Review comment:
       this is nice -- it means the code doesn't have to have read all the batches in order to filter each one

##########
File path: rust/datafusion/tests/user_defined_plan.rs
##########
@@ -468,51 +469,69 @@ fn accumulate_batch(
         .expect("Column 1 is not revenue");
 
     for row in 0..num_rows {
-        add_row(top_values, customer_id.value(row), revenue.value(row), k);
+        add_row(
+            &mut top_values,
+            customer_id.value(row),
+            revenue.value(row),
+            k,
+        );
     }
-    Ok(())
+    Ok(top_values)
 }
 
-impl Iterator for TopKReader {
+impl Stream for TopKReader {
     type Item = std::result::Result<RecordBatch, ArrowError>;
 
-    /// Reads the next `RecordBatch`.
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.done {
-            return None;
+            return Poll::Ready(None);
         }
-
-        // Hard coded implementation for sales / customer_id example
-        let mut top_values: BTreeMap<i64, String> = BTreeMap::new();
+        // this aggregates and thus returns a single RecordBatch.
+        self.done = true;
 
         // take this as immutable
-        let k = &self.k;
+        let k = self.k;
+        let schema = self.schema().clone();
 
-        self.input
+        let top_values = self
+            .input
             .as_mut()
-            .into_iter()
-            .map(|batch| accumulate_batch(&batch?, &mut top_values, k))
-            .collect::<Result<()>>()
-            .unwrap();
-
-        // make output by walking over the map backwards (so values are descending)
-        let (revenue, customer): (Vec<i64>, Vec<&String>) =
-            top_values.iter().rev().unzip();
-
-        let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect();
+            // Hard coded implementation for sales / customer_id example as BTree
+            .try_fold(
+                BTreeMap::<i64, String>::new(),
+                move |top_values, batch| async move {
+                    accumulate_batch(&batch, top_values, &k)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                },
+            );
+
+        let top_values = top_values.map(|top_values| match top_values {
+            Ok(top_values) => {
+                // make output by walking over the map backwards (so values are descending)
+                let (revenue, customer): (Vec<i64>, Vec<&String>) =
+                    top_values.iter().rev().unzip();
+
+                let customer: Vec<&str> = customer.iter().map(|&s| &**s).collect();
+                Ok(RecordBatch::try_new(
+                    schema,
+                    vec![
+                        Arc::new(StringArray::from(customer)),
+                        Arc::new(Int64Array::from(revenue)),
+                    ],
+                )?)
+            }
+            Err(e) => Err(e),
+        });
+        let mut top_values = Box::pin(top_values.into_stream());

Review comment:
       this is very cool to see how the user defined code got transformed to a `Stream`

##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -100,27 +103,29 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions)
-                    .map(|part_i| {
-                        let input = self.input.clone();
-                        tokio::spawn(async move {
-                            let it = input.execute(part_i).await?;
-                            common::collect(it)
-                        })
+                let tasks = (0..input_partitions).map(|part_i| {
+                    let input = self.input.clone();
+                    tokio::spawn(async move {
+                        let stream = input.execute(part_i).await?;
+                        common::collect(stream).await

Review comment:
       I am still concerned that these calls to `collect` effectively buffer all the input before producing any output -- this both will likely consume more memory than needed as well as being a 'pipeline breaker' (nothing that relies on the output of the `Merge` can run until *all* of the merge inputs have been produced). 
   
   We could peraps use `chain` here to fuse the streams together -- https://docs.rs/tokio/0.3.0/tokio/stream/trait.StreamExt.html#method.chain. Using chain would avoid the need to buffer the intermediate results (aka the collect), but  it would also likely cause the input partitions to run one after the other rather than in parallel_
   
   Another thought I had would be to use something fancier like a `channel` that all the substreams write to. 
   
   But in any event that could be done in some future PR -- I think this code is better than master, and a step forward. 
   
   

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -197,24 +197,27 @@ fn read_file(
     Ok(())
 }
 
-struct ParquetIterator {
+struct ParquetStream {
     schema: SchemaRef,
     response_rx: Receiver<Option<ArrowResult<RecordBatch>>>,
 }
 
-impl Iterator for ParquetIterator {
+impl Stream for ParquetStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        self: std::pin::Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         match self.response_rx.recv() {

Review comment:
       This is getting closer to `async` parquet reader 👍 

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -505,62 +513,88 @@ fn aggregate_batch(
 
             // 1.3
             match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final => accum.merge_batch(values),
+                AggregateMode::Partial => {
+                    accum.update_batch(values)?;
+                }
+                AggregateMode::Final => {
+                    accum.merge_batch(values)?;
+                }
             }
+            Ok(accum)
         })
-        .collect::<Result<()>>()
+        .collect::<Result<Vec<_>>>()
 }
 
-impl Iterator for HashAggregateIterator {
+impl Stream for HashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mut accumulators = match create_accumulators(&self.aggr_expr) {
+        let accumulators = match create_accumulators(&self.aggr_expr) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
+        let expressions = Arc::new(expressions);
 
         let mode = self.mode;
         let schema = self.schema();
 
         // 1 for each batch, update / merge accumulators with the expressions' values
-        match self
+        // future is ready when all batches are computed
+        let future = self
             .input
             .as_mut()
-            .into_iter()
-            .map(|batch| {
-                aggregate_batch(&mode, &batch?, &mut accumulators, &expressions)
-                    .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            .try_fold(
+                // pass the expressions on every fold to handle closures' mutability
+                (accumulators, expressions),

Review comment:
       this is cool -- to incrementally accumulate the aggregates in the strems




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r507174314



##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -31,53 +32,58 @@ use array::{
 };
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::Result as ArrowResult;
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::record_batch::RecordBatch;
 use arrow::{
     array::{self, ArrayRef},
     datatypes::Schema,
 };
+use futures::{Stream, TryStreamExt};
 
-/// Iterator over a vector of record batches
-pub struct RecordBatchIterator {
+/// Stream of record batches
+pub struct SizedRecordBatchStream {

Review comment:
       I was thinking to reserve `Buffered` to a stream that buffers N entries. This one is already initialized with everything via `new`. This is more like the [`iter`](https://docs.rs/futures/0.3.6/futures/stream/fn.iter.html), with a fixed known len and a known schema. IMO we should avoid using this struct, as it requires us to collect to it somewhere.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao closed pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao closed pull request #8473:
URL: https://github.com/apache/arrow/pull/8473


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r506503915



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -100,27 +103,29 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions)
-                    .map(|part_i| {
-                        let input = self.input.clone();
-                        tokio::spawn(async move {
-                            let it = input.execute(part_i).await?;
-                            common::collect(it)
-                        })
+                let tasks = (0..input_partitions).map(|part_i| {
+                    let input = self.input.clone();
+                    tokio::spawn(async move {
+                        let stream = input.execute(part_i).await?;
+                        common::collect(stream).await

Review comment:
       Yes, collect is the reason we don't see better performance from this PR IMO. I agree that this can be fixed in a separate PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-709528557


   FYI @alamb @vertexclique @maxburke


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710090984


   > A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch.
   
   That sounds like exactly what we need. We compute partitions in parallel and each partition produces an ordered set of batches. In some cases, it is critical that the order is deterministic.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r508230895



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -331,72 +337,74 @@ impl GroupedHashAggregateIterator {
 }
 
 type AccumulatorSet = Vec<Box<dyn Accumulator>>;
+type Accumulators = FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)>;
 
-impl Iterator for GroupedHashAggregateIterator {
+impl Stream for GroupedHashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mode = &self.mode;
-        let group_expr = &self.group_expr;
-        let aggr_expr = &self.aggr_expr;
+        let mode = self.mode.clone();
+        let group_expr = self.group_expr.clone();
+        let aggr_expr = self.aggr_expr.clone();
+        let schema = self.schema.clone();
 
         // the expressions to evaluate the batch, one vec of expressions per aggregation
         let aggregate_expressions = match aggregate_expressions(&aggr_expr, &mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         // mapping key -> (set of accumulators, indices of the key in the batch)
         // * the indexes are updated at each row
         // * the accumulators are updated at the end of each batch
         // * the indexes are `clear`ed at the end of each batch
-        let mut accumulators: FnvHashMap<
-            Vec<GroupByScalar>,
-            (AccumulatorSet, Box<Vec<u32>>),
-        > = FnvHashMap::default();
+        //let mut accumulators: Accumulators = FnvHashMap::default();
 
         // iterate over all input batches and update the accumulators
-        match self
-            .input
-            .as_mut()
-            .into_iter()
-            .map(|batch| {
+        let future = self.input.as_mut().try_fold(
+            Accumulators::default(),
+            |accumulators, batch| async {
                 group_aggregate_batch(
                     &mode,
                     &group_expr,
                     &aggr_expr,
-                    &batch?,
-                    &mut accumulators,
+                    batch,
+                    accumulators,
                     &aggregate_expressions,
                 )
                 .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            },
+        );
 
-        Some(
-            create_batch_from_map(
-                &self.mode,
-                &accumulators,
-                self.group_expr.len(),
-                &self.schema,
-            )
-            .map_err(ExecutionError::into_arrow_external_error),
-        )
+        let future = future.map(|accumulators| match accumulators {
+            Ok(accumulators) => {
+                create_batch_from_map(&mode, &accumulators, group_expr.len(), &schema)
+            }
+            Err(e) => Err(e),
+        });

Review comment:
       Good point. Fixed (and below).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710075328


   I am still not entirely happy with this, and I think that we need a further step to make this work well, which is the reason I placed this back to draft.
   
   Essentially, I misinterpreted what a `Stream` is used for. A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch. Therefore, this PR is only useful for situations on which we are reading batches that we may need to wait for.
   
   I think that the solution is to put a `future` as the `Item` of the `Stream`, so that we can collect all those futures and perform `join_all().await`, thereby sending every task to the scheduler.
   
   PR #8480 is the proposal for that. It doubles the performance of the larger projections benchmarks (purely due to scheduling / threading).
   
   Note that each PR solves a different problem: this one is related to the fact that some batches may not be available yet (and thus we should not block), the other one is related to the fact that some operations can be parallelized (and thus we should `spawn`). I think that together they address the main issues.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710744576


   I agree with you that #8480 has other issues that we want to avoid. Therefore, I placed this back as ready to review.
   
   Thanks a lot for the comments so far: this concurrency stuff is new to me and thus your guidance is really appreciated here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r507632793



##########
File path: rust/datafusion/src/physical_plan/common.rs
##########
@@ -31,53 +32,58 @@ use array::{
 };
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::error::Result as ArrowResult;
-use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::record_batch::RecordBatch;
 use arrow::{
     array::{self, ArrayRef},
     datatypes::Schema,
 };
+use futures::{Stream, TryStreamExt};
 
-/// Iterator over a vector of record batches
-pub struct RecordBatchIterator {
+/// Stream of record batches
+pub struct SizedRecordBatchStream {

Review comment:
       Yeah, I was thinking it would be nice to imply in this struct's name somehow that it should be avoided if possible, because, as you say, it requires buffering / collecting up the `RecordBatch`es before producing any results




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-709648906


   This is great to see. I ran a quick TPC-H query 1 benchmark against the 100 GB data set with this PR and saw no noticeable difference in performance, so the microbenchmark might not be so relevant here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vertexclique commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
vertexclique commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r506357109



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -331,72 +337,74 @@ impl GroupedHashAggregateIterator {
 }
 
 type AccumulatorSet = Vec<Box<dyn Accumulator>>;
+type Accumulators = FnvHashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>)>;
 
-impl Iterator for GroupedHashAggregateIterator {
+impl Stream for GroupedHashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mode = &self.mode;
-        let group_expr = &self.group_expr;
-        let aggr_expr = &self.aggr_expr;
+        let mode = self.mode.clone();
+        let group_expr = self.group_expr.clone();
+        let aggr_expr = self.aggr_expr.clone();
+        let schema = self.schema.clone();
 
         // the expressions to evaluate the batch, one vec of expressions per aggregation
         let aggregate_expressions = match aggregate_expressions(&aggr_expr, &mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         // mapping key -> (set of accumulators, indices of the key in the batch)
         // * the indexes are updated at each row
         // * the accumulators are updated at the end of each batch
         // * the indexes are `clear`ed at the end of each batch
-        let mut accumulators: FnvHashMap<
-            Vec<GroupByScalar>,
-            (AccumulatorSet, Box<Vec<u32>>),
-        > = FnvHashMap::default();
+        //let mut accumulators: Accumulators = FnvHashMap::default();
 
         // iterate over all input batches and update the accumulators
-        match self
-            .input
-            .as_mut()
-            .into_iter()
-            .map(|batch| {
+        let future = self.input.as_mut().try_fold(
+            Accumulators::default(),
+            |accumulators, batch| async {
                 group_aggregate_batch(
                     &mode,
                     &group_expr,
                     &aggr_expr,
-                    &batch?,
-                    &mut accumulators,
+                    batch,
+                    accumulators,
                     &aggregate_expressions,
                 )
                 .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            },
+        );
 
-        Some(
-            create_batch_from_map(
-                &self.mode,
-                &accumulators,
-                self.group_expr.len(),
-                &self.schema,
-            )
-            .map_err(ExecutionError::into_arrow_external_error),
-        )
+        let future = future.map(|accumulators| match accumulators {
+            Ok(accumulators) => {
+                create_batch_from_map(&mode, &accumulators, group_expr.len(), &schema)
+            }
+            Err(e) => Err(e),
+        });

Review comment:
       Can you use `map` instead of pattern matching? since err escalation looks redundant.

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -505,62 +513,88 @@ fn aggregate_batch(
 
             // 1.3
             match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final => accum.merge_batch(values),
+                AggregateMode::Partial => {
+                    accum.update_batch(values)?;
+                }
+                AggregateMode::Final => {
+                    accum.merge_batch(values)?;
+                }
             }
+            Ok(accum)
         })
-        .collect::<Result<()>>()
+        .collect::<Result<Vec<_>>>()
 }
 
-impl Iterator for HashAggregateIterator {
+impl Stream for HashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mut accumulators = match create_accumulators(&self.aggr_expr) {
+        let accumulators = match create_accumulators(&self.aggr_expr) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
+        let expressions = Arc::new(expressions);
 
         let mode = self.mode;
         let schema = self.schema();
 
         // 1 for each batch, update / merge accumulators with the expressions' values
-        match self
+        // future is ready when all batches are computed
+        let future = self
             .input
             .as_mut()
-            .into_iter()
-            .map(|batch| {
-                aggregate_batch(&mode, &batch?, &mut accumulators, &expressions)
-                    .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            .try_fold(
+                // pass the expressions on every fold to handle closures' mutability
+                (accumulators, expressions),
+                |(acc, expr), batch| async move {
+                    aggregate_batch(&mode, &batch, acc, &expr)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                        .map(|agg| (agg, expr))
+                },
+            )
+            // pick the accumulators (disregard the expressions)
+            .map(|e| e.map(|e| e.0));
+
+        let future = future.map(|b| {
+            match b {
+                Err(e) => return Err(e),
+                Ok(acc) => {
+                    // 2 convert values to a record batch

Review comment:
       2 is `to` i presume. Right :) ?

##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -505,62 +513,88 @@ fn aggregate_batch(
 
             // 1.3
             match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final => accum.merge_batch(values),
+                AggregateMode::Partial => {
+                    accum.update_batch(values)?;
+                }
+                AggregateMode::Final => {
+                    accum.merge_batch(values)?;
+                }
             }
+            Ok(accum)
         })
-        .collect::<Result<()>>()
+        .collect::<Result<Vec<_>>>()
 }
 
-impl Iterator for HashAggregateIterator {
+impl Stream for HashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mut accumulators = match create_accumulators(&self.aggr_expr) {
+        let accumulators = match create_accumulators(&self.aggr_expr) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
+        let expressions = Arc::new(expressions);
 
         let mode = self.mode;
         let schema = self.schema();
 
         // 1 for each batch, update / merge accumulators with the expressions' values
-        match self
+        // future is ready when all batches are computed
+        let future = self
             .input
             .as_mut()
-            .into_iter()
-            .map(|batch| {
-                aggregate_batch(&mode, &batch?, &mut accumulators, &expressions)
-                    .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            .try_fold(
+                // pass the expressions on every fold to handle closures' mutability
+                (accumulators, expressions),
+                |(acc, expr), batch| async move {
+                    aggregate_batch(&mode, &batch, acc, &expr)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                        .map(|agg| (agg, expr))
+                },
+            )
+            // pick the accumulators (disregard the expressions)
+            .map(|e| e.map(|e| e.0));
+
+        let future = future.map(|b| {
+            match b {
+                Err(e) => return Err(e),
+                Ok(acc) => {
+                    // 2 convert values to a record batch
+                    finalize_aggregation(&acc, &mode)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                        .and_then(|columns| RecordBatch::try_new(schema.clone(), columns))
+                }
+            }
+        });

Review comment:
       Same map simplification here probably.

##########
File path: rust/datafusion/src/datasource/memory.rs
##########
@@ -135,6 +134,7 @@ mod tests {
     use super::*;
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
+    use futures::StreamExt;

Review comment:
       Please not do that. Futures interfaces are not compatible with other APIs in the ecosystem. This is a good implementation which is adaptable to tokio, async-std, bastion and others.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r506791355



##########
File path: rust/datafusion/src/physical_plan/hash_aggregate.rs
##########
@@ -505,62 +513,88 @@ fn aggregate_batch(
 
             // 1.3
             match mode {
-                AggregateMode::Partial => accum.update_batch(values),
-                AggregateMode::Final => accum.merge_batch(values),
+                AggregateMode::Partial => {
+                    accum.update_batch(values)?;
+                }
+                AggregateMode::Final => {
+                    accum.merge_batch(values)?;
+                }
             }
+            Ok(accum)
         })
-        .collect::<Result<()>>()
+        .collect::<Result<Vec<_>>>()
 }
 
-impl Iterator for HashAggregateIterator {
+impl Stream for HashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
 
-    fn next(&mut self) -> Option<Self::Item> {
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
         if self.finished {
-            return None;
+            return Poll::Ready(None);
         }
 
         // return single batch
         self.finished = true;
 
-        let mut accumulators = match create_accumulators(&self.aggr_expr) {
+        let accumulators = match create_accumulators(&self.aggr_expr) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
 
         let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) {
             Ok(e) => e,
-            Err(e) => return Some(Err(ExecutionError::into_arrow_external_error(e))),
+            Err(e) => {
+                return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error(
+                    e,
+                ))))
+            }
         };
+        let expressions = Arc::new(expressions);
 
         let mode = self.mode;
         let schema = self.schema();
 
         // 1 for each batch, update / merge accumulators with the expressions' values
-        match self
+        // future is ready when all batches are computed
+        let future = self
             .input
             .as_mut()
-            .into_iter()
-            .map(|batch| {
-                aggregate_batch(&mode, &batch?, &mut accumulators, &expressions)
-                    .map_err(ExecutionError::into_arrow_external_error)
-            })
-            .collect::<ArrowResult<()>>()
-        {
-            Err(e) => return Some(Err(e)),
-            Ok(_) => {}
-        }
+            .try_fold(
+                // pass the expressions on every fold to handle closures' mutability
+                (accumulators, expressions),
+                |(acc, expr), batch| async move {
+                    aggregate_batch(&mode, &batch, acc, &expr)
+                        .map_err(ExecutionError::into_arrow_external_error)
+                        .map(|agg| (agg, expr))
+                },
+            )
+            // pick the accumulators (disregard the expressions)
+            .map(|e| e.map(|e| e.0));
+
+        let future = future.map(|b| {
+            match b {
+                Err(e) => return Err(e),
+                Ok(acc) => {
+                    // 2 convert values to a record batch

Review comment:
       it is `2.`, referring to the item number two in the list of items above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#discussion_r509884815



##########
File path: rust/datafusion/src/physical_plan/merge.rs
##########
@@ -100,27 +103,29 @@ impl ExecutionPlan for MergeExec {
                 self.input.execute(0).await
             }
             _ => {
-                let tasks = (0..input_partitions)
-                    .map(|part_i| {
-                        let input = self.input.clone();
-                        tokio::spawn(async move {
-                            let it = input.execute(part_i).await?;
-                            common::collect(it)
-                        })
+                let tasks = (0..input_partitions).map(|part_i| {
+                    let input = self.input.clone();
+                    tokio::spawn(async move {
+                        let stream = input.execute(part_i).await?;
+                        common::collect(stream).await

Review comment:
       PR here: https://github.com/apache/arrow/pull/8503




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710096379


   > A stream of record batches will only return the next batch after the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch. Therefore, this PR is only useful for situations on which we are reading batches that we may need to wait for.
   
   @jorgecarleitao  -- I personally think leaving the interface to be `Stream<RecordBatch>` is the way to go. 
   
   Among other things, this interface allows "backpressure" so that we don't end up buffering intermediate results if one operator (e.g. filter) can produce record batches than its consumer can process them (e.g. aggregate). 
   
   If we want to add additional parallelism with a tunable cost of additional buffering, we could keep the interface of this PR and add an operator like "Buffer". Buffer would calculate and store some number of `RecordBatches`, perhaps in a channel, as separate tasks in advance of when a batch was requested. 
   
   Then the user of DataFusion could control how much more buffering they were willing to accept in order to get more parallelism. 
   
   I think an approach like https://github.com/apache/arrow/pull/8480 is cool, but the choice of the aggregate, for example, to compute the input as fast as possible in new tasks, even if the aggregate
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710090984


   > A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch.
   
   That sounds like exactly what we need. We compute partitions in parallel and each partition produces an ordered set of batches. In some cases, it is critical that the order is deterministic.
   
   The benefit of this PR is that we don't need to wait for each operator to complete before the next one starts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove edited a comment on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
andygrove edited a comment on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710090984


   > A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch.
   
   That sounds like exactly what we need. We compute partitions in parallel and each partition produces an ordered set of batches. In some cases, it is critical that the order is deterministic.
   
   The benefit of this PR is that we don't need to wait for each operator to complete before the next one starts (once we remove the use of `collect` everywhere).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao edited a comment on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao edited a comment on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710075328


   I am still not entirely happy with this, and I think that we need a further step to make this work well, which is the reason I placed this back to draft.
   
   Essentially, I misinterpreted what a `Stream` is used for. A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch. Therefore, this PR is only useful for situations on which we are reading batches that we may need to wait for.
   
   I think that the solution is to put a `future` as the `Item` of the `Stream`, so that we can collect all those futures and perform `join_all().await`, thereby sending every task to the scheduler.
   
   PR #8480 is the proposal for that. I doubles the performance of the larger projections benchmarks (purely due to scheduling / threading).
   
   Note that each PR solves a different problem: this one is related to the fact that some batches may not be available yet (and thus we should not block), the other one is related to the fact that some operations can be parallelized (and thus we should `spawn`). I think that together they address the main issues.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-710075328


   I am still not entirely happy with this, and I think that we need a further step to make this work well, which is the reason I placed this back to draft.
   
   Essentially, I misinterpreted what a `Stream` is used for. A `stream` of record batches will only return the next batch _after_ the previous one was (async) computed. Therefore, we can't parallelize, as we are blocked by the execution of the previous batch. Therefore, this PR is only useful for situations on which we are reading batches that, we may need to wait for.
   
   I think that the solution is to put a `future` as the `Item` of the `Stream`, so that we can collect all those futures and perform `join_all().await`, thereby sending every task to the scheduler.
   
   PR #8480 is the proposal for that. I doubles the performance of the larger projections benchmarks (purely due to scheduling / threading).
   
   Note that each PR solves a different problem: this one is related to the fact that some batches may not be available yet (and thus we should not block), the other one is related to the fact that some operations can be parallelized (and thus we should `spawn`). I think that together they address the main issues.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8473: ARROW-10320 [Rust] [DataFusion] Migrated from batch iterators to batch streams.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8473:
URL: https://github.com/apache/arrow/pull/8473#issuecomment-709533249


   https://issues.apache.org/jira/browse/ARROW-10320


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org