You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/30 11:36:34 UTC

[GitHub] [arrow] alamb commented on a change in pull request #9043: ARROW-11058: [Rust] [DataFusion] Implement coalesce batches operator

alamb commented on a change in pull request #9043:
URL: https://github.com/apache/arrow/pull/9043#discussion_r550163355



##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;

Review comment:
       I wonder if a heuristic like `config.batch_size / 2` might be faster -- by setting it to `batch_size` we'll end up copying data if even a single row from a batch is filtered. 

##########
File path: rust/datafusion/src/physical_plan/coalesce_batches.rs
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+//! vectorized processing by upstream operators.
+
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
+};
+
+use arrow::compute::kernels::concat::concat;
+use arrow::datatypes::SchemaRef;
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use async_trait::async_trait;
+use futures::stream::{Stream, StreamExt};
+use log::debug;
+
+/// CoalesceBatchesExec combines small batches into larger batches for more efficient use of
+/// vectorized processing by upstream operators.
+#[derive(Debug)]
+pub struct CoalesceBatchesExec {
+    /// The input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+}
+
+impl CoalesceBatchesExec {
+    /// Create a new CoalesceBatchesExec
+    pub fn new(input: Arc<dyn ExecutionPlan>, target_batch_size: usize) -> Self {
+        Self {
+            input,
+            target_batch_size,
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for CoalesceBatchesExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        // The coalesce batches operator does not make any changes to the schema of its input
+        self.input.schema()
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        // The coalesce batches operator does not make any changes to the partitioning of its input
+        self.input.output_partitioning()
+    }
+
+    fn with_new_children(
+        &self,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        match children.len() {
+            1 => Ok(Arc::new(CoalesceBatchesExec::new(
+                children[0].clone(),
+                self.target_batch_size,
+            ))),
+            _ => Err(DataFusionError::Internal(
+                "CoalesceBatchesExec wrong number of children".to_string(),
+            )),
+        }
+    }
+
+    async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(CoalesceBatchesStream {
+            input: self.input.execute(partition).await?,
+            schema: self.input.schema().clone(),
+            target_batch_size: self.target_batch_size.clone(),
+            buffer: Vec::new(),
+            buffered_rows: 0,
+        }))
+    }
+}
+
+struct CoalesceBatchesStream {
+    /// The input plan
+    input: SendableRecordBatchStream,
+    /// The input schema
+    schema: SchemaRef,
+    /// Minimum number of rows for coalesces batches
+    target_batch_size: usize,
+    /// Buffered batches
+    buffer: Vec<RecordBatch>,
+    /// Buffered row count
+    buffered_rows: usize,
+}
+
+impl Stream for CoalesceBatchesStream {
+    type Item = ArrowResult<RecordBatch>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        loop {
+            let input_batch = self.input.poll_next_unpin(cx);
+            match input_batch {
+                Poll::Ready(x) => match x {
+                    Some(Ok(ref batch)) => {
+                        if batch.num_rows() >= self.target_batch_size
+                            && self.buffer.is_empty()
+                        {
+                            return Poll::Ready(Some(Ok(batch.clone())));

Review comment:
       This has the effect that it can reorder the output rows from this operator. I think that is fine, but it should probably be noted somewhere (so that when we get to optimizations related to sorting we know this operation as written will not preserve the input order)

##########
File path: rust/datafusion/src/physical_plan/planner.rs
##########
@@ -110,6 +111,16 @@ impl DefaultPhysicalPlanner {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
+            // wrap filter in coalesce batches
+            let plan = if plan.as_any().downcast_ref::<FilterExec>().is_some() {
+                let target_batch_size = ctx_state.config.batch_size;
+                Arc::new(CoalesceBatchesExec::new(plan.clone(), target_batch_size))

Review comment:
       The issue with Cost based optimizers is that invariably they get it wrong sometimes (e.g. the selectivity is mis calculated due to correlations in the data or nulls or something).
   
   I think state of the art in optimizers is to delay as many such decisions to runtime as possible (when the actual cardinalities are known).
   
   So in this case, rather than figuring out which output operators to wrap, I suggest we do something like wrap *all* operators with coalesce, or maybe  update the `Filter` operation itself to do this coalescing internally when it is preparing its output and avoid the copy entirely




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org