You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/29 15:47:18 UTC
[arrow-datafusion] branch master updated: [minor] use arrow kernel concat_batches instead combine_batches (#4423)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e4d790d49 [minor] use arrow kernel concat_batches instead combine_batches (#4423)
e4d790d49 is described below
commit e4d790d495d65a23e0a7dc2994786c59bfe5d66c
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Tue Nov 29 23:47:12 2022 +0800
[minor] use arrow kernel concat_batches instead combine_batches (#4423)
* [minor] use arrow kernel concat_batches instead combine_batches
Signed-off-by: yangjiang <ya...@ebay.com>
* fix test
Signed-off-by: yangjiang <ya...@ebay.com>
Signed-off-by: yangjiang <ya...@ebay.com>
---
datafusion/core/src/physical_plan/common.rs | 67 ----------------------
.../src/physical_plan/joins/sort_merge_join.rs | 6 +-
.../src/physical_plan/windows/window_agg_exec.rs | 30 +++++-----
3 files changed, 18 insertions(+), 85 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index c00c7421e..b4db3a32b 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -22,7 +22,6 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
-use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
@@ -96,32 +95,6 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
.map_err(DataFusionError::from)
}
-/// Combine a slice of record batches into one, or returns None if the slice itself
-/// is empty; all the record batches inside the slice must be of the same schema.
-pub(crate) fn combine_batches(
- batches: &[RecordBatch],
- schema: SchemaRef,
-) -> ArrowResult<Option<RecordBatch>> {
- if batches.is_empty() {
- Ok(None)
- } else {
- let columns = schema
- .fields()
- .iter()
- .enumerate()
- .map(|(i, _)| {
- concat(
- &batches
- .iter()
- .map(|batch| batch.column(i).as_ref())
- .collect::<Vec<_>>(),
- )
- })
- .collect::<ArrowResult<Vec<_>>>()?;
- Ok(Some(RecordBatch::try_new(schema.clone(), columns)?))
- }
-}
-
/// Recursively builds a list of files in a directory with a given extension
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
@@ -303,46 +276,6 @@ mod tests {
record_batch::RecordBatch,
};
- #[test]
- fn test_combine_batches_empty() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![
- Field::new("f32", DataType::Float32, false),
- Field::new("f64", DataType::Float64, false),
- ]));
- let result = combine_batches(&[], schema)?;
- assert!(result.is_none());
- Ok(())
- }
-
- #[test]
- fn test_combine_batches() -> Result<()> {
- let schema = Arc::new(Schema::new(vec![
- Field::new("f32", DataType::Float32, false),
- Field::new("f64", DataType::Float64, false),
- ]));
-
- let batch_count = 1000;
- let batch_size = 10;
- let batches = (0..batch_count)
- .map(|i| {
- RecordBatch::try_new(
- Arc::clone(&schema),
- vec![
- Arc::new(Float32Array::from_slice(vec![i as f32; batch_size])),
- Arc::new(Float64Array::from_slice(vec![i as f64; batch_size])),
- ],
- )
- .unwrap()
- })
- .collect::<Vec<_>>();
-
- let result = combine_batches(&batches, schema)?;
- assert!(result.is_some());
- let result = result.unwrap();
- assert_eq!(batch_count * batch_size, result.num_rows());
- Ok(())
- }
-
#[test]
fn test_compute_record_batch_statistics_empty() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index 7ae4192e4..797f61db6 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -30,7 +30,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::array::*;
-use arrow::compute::{take, SortOptions};
+use arrow::compute::{concat_batches, take, SortOptions};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
@@ -40,7 +40,6 @@ use crate::error::DataFusionError;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::logical_expr::JoinType;
-use crate::physical_plan::common::combine_batches;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::joins::utils::{
@@ -1085,8 +1084,7 @@ impl SMJStream {
}
fn output_record_batch_and_reset(&mut self) -> ArrowResult<RecordBatch> {
- let record_batch =
- combine_batches(&self.output_record_batches, self.schema.clone())?.unwrap();
+ let record_batch = concat_batches(&self.schema, &self.output_record_batches)?;
self.join_metrics.output_batches.add(1);
self.join_metrics.output_rows.add(record_batch.num_rows());
self.output_size -= record_batch.num_rows();
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 8f0b5364f..2e1b6a70b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,10 +24,11 @@ use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
use crate::physical_plan::{
- common, ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
+ ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
+use arrow::compute::concat_batches;
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
@@ -274,20 +275,21 @@ impl WindowAggStream {
// record compute time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
- let batch = common::combine_batches(&self.batches, self.input.schema())?;
- if let Some(batch) = batch {
- // calculate window cols
- let mut columns = compute_window_aggregates(&self.window_expr, &batch)
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-
- // combine with the original cols
- // note the setup of window aggregates is that they newly calculated window
- // expressions are always prepended to the columns
- columns.extend_from_slice(batch.columns());
- RecordBatch::try_new(self.schema.clone(), columns)
- } else {
- Ok(RecordBatch::new_empty(self.schema.clone()))
+ if self.batches.is_empty() {
+ return Ok(RecordBatch::new_empty(self.schema.clone()));
}
+
+ let batch = concat_batches(&self.input.schema(), &self.batches)?;
+
+ // calculate window cols
+ let mut columns = compute_window_aggregates(&self.window_expr, &batch)
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+
+ // combine with the original cols
+ // note the setup of window aggregates is that they newly calculated window
+ // expressions are always prepended to the columns
+ columns.extend_from_slice(batch.columns());
+ RecordBatch::try_new(self.schema.clone(), columns)
}
}