You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/13 22:42:42 UTC
[arrow] branch master updated: ARROW-9725: [Rust] [DataFusion]
SortExec and LimitExec re-use MergeExec
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new b2788c5 ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec
b2788c5 is described below
commit b2788c55ac2173a22dd25b932cf0cedcc9e18391
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 16:42:13 2020 -0600
ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec
Closes #7958 from andygrove/ARROW-9725
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/datafusion/src/execution/context.rs | 4 +-
.../src/execution/physical_plan/limit.rs | 102 ++++++++++++++-------
.../src/execution/physical_plan/memory.rs | 2 +-
.../datafusion/src/execution/physical_plan/sort.rs | 33 ++-----
4 files changed, 80 insertions(+), 61 deletions(-)
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index dcbfe7b..8259104 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -39,7 +39,7 @@ use crate::execution::physical_plan::expressions::{
Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
};
use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
-use crate::execution::physical_plan::limit::LimitExec;
+use crate::execution::physical_plan::limit::GlobalLimitExec;
use crate::execution::physical_plan::math_expressions::register_math_functions;
use crate::execution::physical_plan::memory::MemoryExec;
use crate::execution::physical_plan::merge::MergeExec;
@@ -419,7 +419,7 @@ impl ExecutionContext {
let input = self.create_physical_plan(input, batch_size)?;
let input_schema = input.as_ref().schema().clone();
- Ok(Arc::new(LimitExec::new(
+ Ok(Arc::new(GlobalLimitExec::new(
input_schema.clone(),
input.partitions()?,
*n,
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs b/rust/datafusion/src/execution/physical_plan/limit.rs
index 49d612b..45dba73 100644
--- a/rust/datafusion/src/execution/physical_plan/limit.rs
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -17,21 +17,22 @@
//! Defines the LIMIT plan
+use std::sync::{Arc, Mutex};
+
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common::RecordBatchIterator;
+use crate::execution::physical_plan::common::{self, RecordBatchIterator};
+use crate::execution::physical_plan::memory::MemoryIterator;
+use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::ExecutionPlan;
use crate::execution::physical_plan::Partition;
use arrow::array::ArrayRef;
use arrow::compute::limit;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
-use std::sync::{Arc, Mutex};
-use std::thread;
-use std::thread::JoinHandle;
/// Limit execution plan
#[derive(Debug)]
-pub struct LimitExec {
+pub struct GlobalLimitExec {
/// Input schema
schema: SchemaRef,
/// Input partitions
@@ -40,14 +41,14 @@ pub struct LimitExec {
limit: usize,
}
-impl LimitExec {
+impl GlobalLimitExec {
/// Create a new MergeExec
pub fn new(
schema: SchemaRef,
partitions: Vec<Arc<dyn Partition>>,
limit: usize,
) -> Self {
- LimitExec {
+ GlobalLimitExec {
schema,
partitions,
limit,
@@ -55,7 +56,7 @@ impl LimitExec {
}
}
-impl ExecutionPlan for LimitExec {
+impl ExecutionPlan for GlobalLimitExec {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
@@ -81,39 +82,42 @@ struct LimitPartition {
impl Partition for LimitPartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- // collect up to "limit" rows on each partition
- let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+ // apply limit in parallel across all input partitions
+ let local_limit = self
.partitions
.iter()
.map(|p| {
- let p = p.clone();
- let limit = self.limit;
- thread::spawn(move || {
- let it = p.execute()?;
- collect_with_limit(it, limit)
- })
+ Arc::new(LocalLimitExec::new(
+ p.clone(),
+ self.schema.clone(),
+ self.limit,
+ )) as Arc<dyn Partition>
})
.collect();
- // combine the results from each thread, up to the limit
+ // limit needs to collapse inputs down to a single partition
+ let merge = MergeExec::new(self.schema.clone(), local_limit);
+ let merge_partitions = merge.partitions()?;
+ // MergeExec must always produce a single partition
+ assert_eq!(1, merge_partitions.len());
+ let it = merge_partitions[0].execute()?;
+ let batches = common::collect(it)?;
+
+ // apply the limit to the output
let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
let mut count = 0;
- for thread in threads {
- let join = thread.join().expect("Failed to join thread");
- let result = join?;
- for batch in result {
- let capacity = self.limit - count;
- if batch.num_rows() <= capacity {
- count += batch.num_rows();
- combined_results.push(Arc::new(batch.clone()))
- } else {
- let batch = truncate_batch(&batch, capacity)?;
- count += batch.num_rows();
- combined_results.push(Arc::new(batch.clone()))
- }
- if count == self.limit {
- break;
- }
+ for batch in batches {
+ let capacity = self.limit - count;
+ if batch.num_rows() <= capacity {
+ count += batch.num_rows();
+ combined_results.push(Arc::new(batch.clone()))
+ } else {
+ let batch = truncate_batch(&batch, capacity)?;
+ count += batch.num_rows();
+ combined_results.push(Arc::new(batch.clone()))
+ }
+ if count == self.limit {
+ break;
}
}
@@ -124,6 +128,36 @@ impl Partition for LimitPartition {
}
}
+/// LocalLimitExec applies a limit so a single partition
+#[derive(Debug)]
+pub struct LocalLimitExec {
+ input: Arc<dyn Partition>,
+ schema: SchemaRef,
+ limit: usize,
+}
+
+impl LocalLimitExec {
+ /// Create a new LocalLimitExec partition
+ pub fn new(input: Arc<dyn Partition>, schema: SchemaRef, limit: usize) -> Self {
+ Self {
+ input,
+ schema,
+ limit,
+ }
+ }
+}
+
+impl Partition for LocalLimitExec {
+ fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ let it = self.input.execute()?;
+ Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+ collect_with_limit(it, self.limit)?,
+ self.schema.clone(),
+ None,
+ )?)))
+ }
+}
+
/// Truncate a RecordBatch to maximum of n rows
pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
@@ -192,7 +226,7 @@ mod tests {
let input = csv.partitions()?;
assert_eq!(input.len(), num_partitions);
- let limit = LimitExec::new(schema.clone(), input, 7);
+ let limit = GlobalLimitExec::new(schema.clone(), input, 7);
let partitions = limit.partitions()?;
// the result should contain 4 batches (one per input partition)
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
index 40c19eb..f9b4441 100644
--- a/rust/datafusion/src/execution/physical_plan/memory.rs
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -112,7 +112,7 @@ impl Partition for MemoryPartition {
}
/// Iterator over batches
-struct MemoryIterator {
+pub(crate) struct MemoryIterator {
/// Vector of record batches
data: Vec<RecordBatch>,
/// Schema representing the data
diff --git a/rust/datafusion/src/execution/physical_plan/sort.rs b/rust/datafusion/src/execution/physical_plan/sort.rs
index e0fcd0b..5f31d48 100644
--- a/rust/datafusion/src/execution/physical_plan/sort.rs
+++ b/rust/datafusion/src/execution/physical_plan/sort.rs
@@ -18,8 +18,6 @@
//! Defines the SORT plan
use std::sync::{Arc, Mutex};
-use std::thread;
-use std::thread::JoinHandle;
use arrow::array::ArrayRef;
pub use arrow::compute::SortOptions;
@@ -30,6 +28,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
use crate::error::Result;
use crate::execution::physical_plan::common::RecordBatchIterator;
use crate::execution::physical_plan::expressions::PhysicalSortExpr;
+use crate::execution::physical_plan::merge::MergeExec;
use crate::execution::physical_plan::{common, ExecutionPlan, Partition};
/// Sort execution plan
@@ -77,27 +76,13 @@ struct SortPartition {
impl Partition for SortPartition {
/// Execute the sort
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
- .input
- .iter()
- .map(|p| {
- let p = p.clone();
- thread::spawn(move || {
- let it = p.execute()?;
- common::collect(it)
- })
- })
- .collect();
-
- // generate record batches from input in parallel
- let mut all_batches: Vec<Arc<RecordBatch>> = vec![];
- for thread in threads {
- let join = thread.join().expect("Failed to join thread");
- let result = join?;
- result
- .iter()
- .for_each(|batch| all_batches.push(Arc::new(batch.clone())));
- }
+ // sort needs to operate on a single partition currently
+ let merge = MergeExec::new(self.schema.clone(), self.input.clone());
+ let merge_partitions = merge.partitions()?;
+ // MergeExec must always produce a single partition
+ assert_eq!(1, merge_partitions.len());
+ let it = merge_partitions[0].execute()?;
+ let batches = common::collect(it)?;
// combine all record batches into one for each column
let combined_batch = RecordBatch::try_new(
@@ -108,7 +93,7 @@ impl Partition for SortPartition {
.enumerate()
.map(|(i, _)| -> Result<ArrayRef> {
Ok(concat(
- &all_batches
+ &batches
.iter()
.map(|batch| batch.columns()[i].clone())
.collect::<Vec<ArrayRef>>(),