You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 15:47:18 UTC

[arrow-datafusion] branch master updated: [minor] use arrow kernel concat_batches instead combine_batches (#4423)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d790d49 [minor] use arrow kernel concat_batches instead combine_batches (#4423)
e4d790d49 is described below

commit e4d790d495d65a23e0a7dc2994786c59bfe5d66c
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Tue Nov 29 23:47:12 2022 +0800

    [minor] use arrow kernel concat_batches instead combine_batches (#4423)
    
    * [minor] use arrow kernel concat_batches instead combine_batches
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    * fix test
    
    Signed-off-by: yangjiang <ya...@ebay.com>
    
    Signed-off-by: yangjiang <ya...@ebay.com>
---
 datafusion/core/src/physical_plan/common.rs        | 67 ----------------------
 .../src/physical_plan/joins/sort_merge_join.rs     |  6 +-
 .../src/physical_plan/windows/window_agg_exec.rs   | 30 +++++-----
 3 files changed, 18 insertions(+), 85 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index c00c7421e..b4db3a32b 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -22,7 +22,6 @@ use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::physical_plan::metrics::MemTrackingMetrics;
 use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
-use arrow::compute::concat;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::ArrowError;
 use arrow::error::Result as ArrowResult;
@@ -96,32 +95,6 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
         .map_err(DataFusionError::from)
 }
 
-/// Combine a slice of record batches into one, or returns None if the slice itself
-/// is empty; all the record batches inside the slice must be of the same schema.
-pub(crate) fn combine_batches(
-    batches: &[RecordBatch],
-    schema: SchemaRef,
-) -> ArrowResult<Option<RecordBatch>> {
-    if batches.is_empty() {
-        Ok(None)
-    } else {
-        let columns = schema
-            .fields()
-            .iter()
-            .enumerate()
-            .map(|(i, _)| {
-                concat(
-                    &batches
-                        .iter()
-                        .map(|batch| batch.column(i).as_ref())
-                        .collect::<Vec<_>>(),
-                )
-            })
-            .collect::<ArrowResult<Vec<_>>>()?;
-        Ok(Some(RecordBatch::try_new(schema.clone(), columns)?))
-    }
-}
-
 /// Recursively builds a list of files in a directory with a given extension
 pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
     let mut filenames: Vec<String> = Vec::new();
@@ -303,46 +276,6 @@ mod tests {
         record_batch::RecordBatch,
     };
 
-    #[test]
-    fn test_combine_batches_empty() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("f32", DataType::Float32, false),
-            Field::new("f64", DataType::Float64, false),
-        ]));
-        let result = combine_batches(&[], schema)?;
-        assert!(result.is_none());
-        Ok(())
-    }
-
-    #[test]
-    fn test_combine_batches() -> Result<()> {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("f32", DataType::Float32, false),
-            Field::new("f64", DataType::Float64, false),
-        ]));
-
-        let batch_count = 1000;
-        let batch_size = 10;
-        let batches = (0..batch_count)
-            .map(|i| {
-                RecordBatch::try_new(
-                    Arc::clone(&schema),
-                    vec![
-                        Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
-                        Arc::new(Float64Array::from_slice(vec![i as f64; batch_size])),
-                    ],
-                )
-                .unwrap()
-            })
-            .collect::<Vec<_>>();
-
-        let result = combine_batches(&batches, schema)?;
-        assert!(result.is_some());
-        let result = result.unwrap();
-        assert_eq!(batch_count * batch_size, result.num_rows());
-        Ok(())
-    }
-
     #[test]
     fn test_compute_record_batch_statistics_empty() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 7ae4192e4..797f61db6 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -30,7 +30,7 @@ use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use arrow::array::*;
-use arrow::compute::{take, SortOptions};
+use arrow::compute::{concat_batches, take, SortOptions};
 use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
@@ -40,7 +40,6 @@ use crate::error::DataFusionError;
 use crate::error::Result;
 use crate::execution::context::TaskContext;
 use crate::logical_expr::JoinType;
-use crate::physical_plan::common::combine_batches;
 use crate::physical_plan::expressions::Column;
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::joins::utils::{
@@ -1085,8 +1084,7 @@ impl SMJStream {
     }
 
     fn output_record_batch_and_reset(&mut self) -> ArrowResult<RecordBatch> {
-        let record_batch =
-            combine_batches(&self.output_record_batches, self.schema.clone())?.unwrap();
+        let record_batch = concat_batches(&self.schema, &self.output_record_batches)?;
         self.join_metrics.output_batches.add(1);
         self.join_metrics.output_rows.add(record_batch.num_rows());
         self.output_size -= record_batch.num_rows();
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 8f0b5364f..2e1b6a70b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,10 +24,11 @@ use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
 use crate::physical_plan::{
-    common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+    ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
+use arrow::compute::concat_batches;
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
@@ -274,20 +275,21 @@ impl WindowAggStream {
         // record compute time on drop
         let _timer = self.baseline_metrics.elapsed_compute().timer();
 
-        let batch = common::combine_batches(&self.batches, self.input.schema())?;
-        if let Some(batch) = batch {
-            // calculate window cols
-            let mut columns = compute_window_aggregates(&self.window_expr, &batch)
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-
-            // combine with the original cols
-            // note the setup of window aggregates is that they newly calculated window
-            // expressions are always prepended to the columns
-            columns.extend_from_slice(batch.columns());
-            RecordBatch::try_new(self.schema.clone(), columns)
-        } else {
-            Ok(RecordBatch::new_empty(self.schema.clone()))
+        if self.batches.is_empty() {
+            return Ok(RecordBatch::new_empty(self.schema.clone()));
         }
+
+        let batch = concat_batches(&self.input.schema(), &self.batches)?;
+
+        // calculate window cols
+        let mut columns = compute_window_aggregates(&self.window_expr, &batch)
+            .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
+        // combine with the original cols
+        // note the setup of window aggregates is that they newly calculated window
+        // expressions are always prepended to the columns
+        columns.extend_from_slice(batch.columns());
+        RecordBatch::try_new(self.schema.clone(), columns)
     }
 }