You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/29 17:59:34 UTC

[GitHub] [arrow] andygrove opened a new pull request #9043: [Rust] [DataFusion] Implement coalesce batches operator

andygrove opened a new pull request #9043:
URL: https://github.com/apache/arrow/pull/9043


   This PR introduces a new `CoalesceBatchesExec` physical operator which combines small input batches and produces larger output batches. The physical optimizer inserts this operator around filters because highly selective filters can produce lots of small batches and this causes poor performance in some cases (especially joins) because we lose some of the benefits of vectorization if we have batches with single rows for example.
   
   For TPC-H q12 at SF=100 and 8 partitions, this provides the following speedups:
   
   | Batch Size | Master | This PR |
   | --- | --- | --- |
   | 16384 | 183.1 s | 41.1 s |
   | 32768 | 59.4 s | 27.0 s |
   | 65536 | 27.5 s | 19.3 s | 
   | 131072 | 18.4 s | 15.7 s | 
   
   Note that the new `CoalesceBatchesExec` uses `MutableArrayData` which still suffers from some kind of exponential slowdown as the number of batches increases, so we should be able to optimize this further, but at least we're using `MutableArrayData` to combine smaller numbers of batches now.
   
   Even if we fix the slowdown in `MutableArrayData`, we would still want `CoalesceBatchesExec` to help avoid tiny batches.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549804455



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;
+                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))

Review comment:
       I guess this could at some point be part of a cost based on optimization based on the nr. of rows and selectivity of the filters?
   Would it also make sense to wrap joins in the coalescebatchexec, as it can also reduce/increase the size of the batches? E.g. what is/would be the effect on tcph query 5?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549808136



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;
+                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))

Review comment:
       I filed https://issues.apache.org/jira/browse/ARROW-11068 to wrap join output and also to make this mechanism more generic.
   
   Rather than hard-code a list of operators that need to be wrapped, we should find a more generic mechanism so that plans can declare if their input and/or output batches should be coalesced (similar to how we handle partitioning) and this would allow custom operators outside of DataFusion to benefit from this optimization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549861337



##########
File path: rust/datafusion/src/physical_plan/mod.rs
##########
@@ -104,6 +104,26 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
     }
 }
 
+/// Execute the [ExecutionPlan] and collect the results in memory
+pub async fn collect_partitioned(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<Vec<RecordBatch>>> {
+    match plan.output_partitioning().partition_count() {
+        0 => Ok(vec![]),
+        1 => {
+            let it = plan.execute(0).await?;
+            Ok(vec![common::collect(it).await?])
+        }
+        _ => {
+            let mut partitions = vec![];
+            for i in 0..plan.output_partitioning().partition_count() {

Review comment:
       Could bind this to a variable and reuse it in the code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r550216625



##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));

Review comment:
       Why would this change the ordering within a single partition? The intent was to produce the rows in the same order they are received.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r550217442



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;

Review comment:
       I actually wanted a separate config for this but I would like to do this once we have https://issues.apache.org/jira/browse/ARROW-11059 (which I would like to try and get in for 3.0.0).
   
   I think changing it to half the batch size for now could make sense. I will push that change to this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752188671


   https://issues.apache.org/jira/browse/ARROW-11058


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752197412


   LGTM 👍  😎 very nice speedup, good to see it also works on the bigger batch sizes (so it's not only because of the "problematic" join). I think it is a nice building block which we can decide to use in optimizations.
   We should still look at improving the join implementation, although this hides the problem quite a bit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549891877



##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));
+                        } else if batch.num_rows() == 0 {
+                            // discard empty batches
+                        } else {
+                            // add to the buffered batches
+                            self.buffer.push(batch.clone());
+                            self.buffered_rows += batch.num_rows();
+                            // check to see if we have enough batches yet
+                            if self.buffered_rows >= self.target_batch_size {

Review comment:
       Currently, we use partitioning as the unit of parallelism (which I think makes sense) and we recently added the repartition operator which can increase or decrease parallelism.
   
   I'm not sure if we will need the ability to split batches. The only use case I can think of right now would be if we had kernels that had limits on the size of batches that they could process for some reason.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549810301



##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::array::{make_array, MutableArrayData};
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));
+                        } else {
+                            // add to the buffered batches (if non-empty)
+                            if batch.num_rows() > 0 {
+                                self.buffer.push(batch.clone());
+                                self.buffered_rows += batch.num_rows();
+                            }
+                            // check to see if we have enough batches yet
+                            if self.buffered_rows >= self.target_batch_size {
+                                // combine the batches and return
+                                let mut arrays =
+                                    Vec::with_capacity(self.schema.fields().len());
+                                for i in 0..self.schema.fields().len() {
+                                    let source_arrays = self
+                                        .buffer
+                                        .iter()
+                                        .map(|batch| batch.column(i).data_ref().as_ref())
+                                        .collect();
+                                    let mut array_data = MutableArrayData::new(
+                                        source_arrays,
+                                        true,
+                                        self.buffered_rows,
+                                    );
+                                    for j in 0..self.buffer.len() {
+                                        array_data.extend(
+                                            j,
+                                            0,
+                                            self.buffer[j].num_rows(),
+                                        );
+                                    }
+                                    let data = array_data.freeze();
+                                    arrays.push(make_array(Arc::new(data)));

Review comment:
       I think that this operation is equivalent to `arrow::compute::kernels::concat::concat`, which we may use instead for simplicity (and in case we optimize `concat`.
   
   Note that this operation is also done in the sort node, where we merge all batches from all partitions in a single batch.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549804742



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;
+                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))

Review comment:
       Very good points. I did not think about join output.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r550271524



##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));

Review comment:
       Whoops -- that was my mistake -- I didn't see the condition of `self.buffer.is_empty()` -- in that case I agree that the rows remain ordered
   
   I guess I was thinking ahead to an operator that only copies data when needed rather than always. Too fancy. Sorry for the noise




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752185807


   @jorgecarleitao @alamb @Dandandan fyi  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752200447


   > We should still look at improving the join implementation, although this hides the problem quite a bit.
   
   https://issues.apache.org/jira/browse/ARROW-11030 is the tracking issue for the MutableArrayData performance issue
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752253432


   Looks great. Added one question that I think could be done in the future and 1 style comment. But in general LGTM, awesome!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r550163355



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;

Review comment:
       I wonder if a heuristic like `config.batch_size / 2` might be faster -- by setting it to `batch_size` we'll end up copying data if even a single row from a batch is filtered. 

##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));

Review comment:
       This has the effect that it can reorder the output rows from this operator. I think that is fine, but it should probably be noted somewhere (so that when we get to optimizations related to sorting we know this operation as written will not preserve the input order)

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;
+                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))

Review comment:
       The issue with Cost based optimizers is that invariably they get it wrong sometimes (e.g. the selectivity is mis calculated due to correlations in the data or nulls or something).
   
   I think state of the art in optimizers is to delay as many such decisions to runtime as possible (when the actual cardinalities are known).
   
   So in this case, rather than figuring out which output operators to wrap, I suggest we do something like wrap *all* operators with coalesce, or maybe  update the `Filter` operation itself to do this coalescing internally when it is preparing its output and avoid the copy entirely




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549861337



##########
File path: rust/datafusion/src/physical_plan/mod.rs
##########
@@ -104,6 +104,26 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
     }
 }
 
+/// Execute the [ExecutionPlan] and collect the results in memory
+pub async fn collect_partitioned(
+    plan: Arc<dyn ExecutionPlan>,
+) -> Result<Vec<Vec<RecordBatch>>> {
+    match plan.output_partitioning().partition_count() {
+        0 => Ok(vec![]),
+        1 => {
+            let it = plan.execute(0).await?;
+            Ok(vec![common::collect(it).await?])
+        }
+        _ => {
+            let mut partitions = vec![];
+            for i in 0..plan.output_partitioning().partition_count() {

Review comment:
       Could bind this partition count to a variable and reuse it in the code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #9043:
URL: https://github.com/apache/arrow/pull/9043


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752205145


   I just realized I missed a case (we need to send any remaining buffered batches when the input has no more batches) and have changed this to a draft. I am also working on tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#issuecomment-752226157


   This is ready for review now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] Dandandan commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r549859868



##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));
+                        } else if batch.num_rows() == 0 {
+                            // discard empty batches
+                        } else {
+                            // add to the buffered batches
+                            self.buffer.push(batch.clone());
+                            self.buffered_rows += batch.num_rows();
+                            // check to see if we have enough batches yet
+                            if self.buffered_rows >= self.target_batch_size {

Review comment:
       Would it make sense too to make batches *smaller* or split them if they are bigger than the target batch size (e.g. for increased parallelism), or do we for now only want to grow them for now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org