You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/13 22:42:42 UTC

[arrow] branch master updated: ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b2788c5  ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec
b2788c5 is described below

commit b2788c55ac2173a22dd25b932cf0cedcc9e18391
Author: Andy Grove <an...@gmail.com>
AuthorDate: Thu Aug 13 16:42:13 2020 -0600

    ARROW-9725: [Rust] [DataFusion] SortExec and LimitExec re-use MergeExec
    
    Closes #7958 from andygrove/ARROW-9725
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/datafusion/src/execution/context.rs           |   4 +-
 .../src/execution/physical_plan/limit.rs           | 102 ++++++++++++++-------
 .../src/execution/physical_plan/memory.rs          |   2 +-
 .../datafusion/src/execution/physical_plan/sort.rs |  33 ++-----
 4 files changed, 80 insertions(+), 61 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index dcbfe7b..8259104 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -39,7 +39,7 @@ use crate::execution::physical_plan::expressions::{
     Avg, BinaryExpr, CastExpr, Column, Count, Literal, Max, Min, PhysicalSortExpr, Sum,
 };
 use crate::execution::physical_plan::hash_aggregate::HashAggregateExec;
-use crate::execution::physical_plan::limit::LimitExec;
+use crate::execution::physical_plan::limit::GlobalLimitExec;
 use crate::execution::physical_plan::math_expressions::register_math_functions;
 use crate::execution::physical_plan::memory::MemoryExec;
 use crate::execution::physical_plan::merge::MergeExec;
@@ -419,7 +419,7 @@ impl ExecutionContext {
                 let input = self.create_physical_plan(input, batch_size)?;
                 let input_schema = input.as_ref().schema().clone();
 
-                Ok(Arc::new(LimitExec::new(
+                Ok(Arc::new(GlobalLimitExec::new(
                     input_schema.clone(),
                     input.partitions()?,
                     *n,
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs b/rust/datafusion/src/execution/physical_plan/limit.rs
index 49d612b..45dba73 100644
--- a/rust/datafusion/src/execution/physical_plan/limit.rs
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -17,21 +17,22 @@
 
 //! Defines the LIMIT plan
 
+use std::sync::{Arc, Mutex};
+
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common::RecordBatchIterator;
+use crate::execution::physical_plan::common::{self, RecordBatchIterator};
+use crate::execution::physical_plan::memory::MemoryIterator;
+use crate::execution::physical_plan::merge::MergeExec;
 use crate::execution::physical_plan::ExecutionPlan;
 use crate::execution::physical_plan::Partition;
 use arrow::array::ArrayRef;
 use arrow::compute::limit;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
-use std::sync::{Arc, Mutex};
-use std::thread;
-use std::thread::JoinHandle;
 
 /// Limit execution plan
 #[derive(Debug)]
-pub struct LimitExec {
+pub struct GlobalLimitExec {
     /// Input schema
     schema: SchemaRef,
     /// Input partitions
@@ -40,14 +41,14 @@ pub struct LimitExec {
     limit: usize,
 }
 
-impl LimitExec {
+impl GlobalLimitExec {
     /// Create a new MergeExec
     pub fn new(
         schema: SchemaRef,
         partitions: Vec<Arc<dyn Partition>>,
         limit: usize,
     ) -> Self {
-        LimitExec {
+        GlobalLimitExec {
             schema,
             partitions,
             limit,
@@ -55,7 +56,7 @@ impl LimitExec {
     }
 }
 
-impl ExecutionPlan for LimitExec {
+impl ExecutionPlan for GlobalLimitExec {
     fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
@@ -81,39 +82,42 @@ struct LimitPartition {
 
 impl Partition for LimitPartition {
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        // collect up to "limit" rows on each partition
-        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
+        // apply limit in parallel across all input partitions
+        let local_limit = self
             .partitions
             .iter()
             .map(|p| {
-                let p = p.clone();
-                let limit = self.limit;
-                thread::spawn(move || {
-                    let it = p.execute()?;
-                    collect_with_limit(it, limit)
-                })
+                Arc::new(LocalLimitExec::new(
+                    p.clone(),
+                    self.schema.clone(),
+                    self.limit,
+                )) as Arc<dyn Partition>
             })
             .collect();
 
-        // combine the results from each thread, up to the limit
+        // limit needs to collapse inputs down to a single partition
+        let merge = MergeExec::new(self.schema.clone(), local_limit);
+        let merge_partitions = merge.partitions()?;
+        // MergeExec must always produce a single partition
+        assert_eq!(1, merge_partitions.len());
+        let it = merge_partitions[0].execute()?;
+        let batches = common::collect(it)?;
+
+        // apply the limit to the output
         let mut combined_results: Vec<Arc<RecordBatch>> = vec![];
         let mut count = 0;
-        for thread in threads {
-            let join = thread.join().expect("Failed to join thread");
-            let result = join?;
-            for batch in result {
-                let capacity = self.limit - count;
-                if batch.num_rows() <= capacity {
-                    count += batch.num_rows();
-                    combined_results.push(Arc::new(batch.clone()))
-                } else {
-                    let batch = truncate_batch(&batch, capacity)?;
-                    count += batch.num_rows();
-                    combined_results.push(Arc::new(batch.clone()))
-                }
-                if count == self.limit {
-                    break;
-                }
+        for batch in batches {
+            let capacity = self.limit - count;
+            if batch.num_rows() <= capacity {
+                count += batch.num_rows();
+                combined_results.push(Arc::new(batch.clone()))
+            } else {
+                let batch = truncate_batch(&batch, capacity)?;
+                count += batch.num_rows();
+                combined_results.push(Arc::new(batch.clone()))
+            }
+            if count == self.limit {
+                break;
             }
         }
 
@@ -124,6 +128,36 @@ impl Partition for LimitPartition {
     }
 }
 
+/// LocalLimitExec applies a limit so a single partition
+#[derive(Debug)]
+pub struct LocalLimitExec {
+    input: Arc<dyn Partition>,
+    schema: SchemaRef,
+    limit: usize,
+}
+
+impl LocalLimitExec {
+    /// Create a new LocalLimitExec partition
+    pub fn new(input: Arc<dyn Partition>, schema: SchemaRef, limit: usize) -> Self {
+        Self {
+            input,
+            schema,
+            limit,
+        }
+    }
+}
+
+impl Partition for LocalLimitExec {
+    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        let it = self.input.execute()?;
+        Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+            collect_with_limit(it, self.limit)?,
+            self.schema.clone(),
+            None,
+        )?)))
+    }
+}
+
 /// Truncate a RecordBatch to maximum of n rows
 pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result<RecordBatch> {
     let limited_columns: Result<Vec<ArrayRef>> = (0..batch.num_columns())
@@ -192,7 +226,7 @@ mod tests {
         let input = csv.partitions()?;
         assert_eq!(input.len(), num_partitions);
 
-        let limit = LimitExec::new(schema.clone(), input, 7);
+        let limit = GlobalLimitExec::new(schema.clone(), input, 7);
         let partitions = limit.partitions()?;
 
         // the result should contain 4 batches (one per input partition)
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
index 40c19eb..f9b4441 100644
--- a/rust/datafusion/src/execution/physical_plan/memory.rs
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -112,7 +112,7 @@ impl Partition for MemoryPartition {
 }
 
 /// Iterator over batches
-struct MemoryIterator {
+pub(crate) struct MemoryIterator {
     /// Vector of record batches
     data: Vec<RecordBatch>,
     /// Schema representing the data
diff --git a/rust/datafusion/src/execution/physical_plan/sort.rs b/rust/datafusion/src/execution/physical_plan/sort.rs
index e0fcd0b..5f31d48 100644
--- a/rust/datafusion/src/execution/physical_plan/sort.rs
+++ b/rust/datafusion/src/execution/physical_plan/sort.rs
@@ -18,8 +18,6 @@
 //! Defines the SORT plan
 
 use std::sync::{Arc, Mutex};
-use std::thread;
-use std::thread::JoinHandle;
 
 use arrow::array::ArrayRef;
 pub use arrow::compute::SortOptions;
@@ -30,6 +28,7 @@ use arrow::record_batch::{RecordBatch, RecordBatchReader};
 use crate::error::Result;
 use crate::execution::physical_plan::common::RecordBatchIterator;
 use crate::execution::physical_plan::expressions::PhysicalSortExpr;
+use crate::execution::physical_plan::merge::MergeExec;
 use crate::execution::physical_plan::{common, ExecutionPlan, Partition};
 
 /// Sort execution plan
@@ -77,27 +76,13 @@ struct SortPartition {
 impl Partition for SortPartition {
     /// Execute the sort
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = self
-            .input
-            .iter()
-            .map(|p| {
-                let p = p.clone();
-                thread::spawn(move || {
-                    let it = p.execute()?;
-                    common::collect(it)
-                })
-            })
-            .collect();
-
-        // generate record batches from input in parallel
-        let mut all_batches: Vec<Arc<RecordBatch>> = vec![];
-        for thread in threads {
-            let join = thread.join().expect("Failed to join thread");
-            let result = join?;
-            result
-                .iter()
-                .for_each(|batch| all_batches.push(Arc::new(batch.clone())));
-        }
+        // sort needs to operate on a single partition currently
+        let merge = MergeExec::new(self.schema.clone(), self.input.clone());
+        let merge_partitions = merge.partitions()?;
+        // MergeExec must always produce a single partition
+        assert_eq!(1, merge_partitions.len());
+        let it = merge_partitions[0].execute()?;
+        let batches = common::collect(it)?;
 
         // combine all record batches into one for each column
         let combined_batch = RecordBatch::try_new(
@@ -108,7 +93,7 @@ impl Partition for SortPartition {
                 .enumerate()
                 .map(|(i, _)| -> Result<ArrayRef> {
                     Ok(concat(
-                        &all_batches
+                        &batches
                             .iter()
                             .map(|batch| batch.columns()[i].clone())
                             .collect::<Vec<ArrayRef>>(),