You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/23 15:07:11 UTC

[arrow] branch master updated: ARROW-9464: [Rust] [DataFusion] Remove Partition trait

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d1d85db  ARROW-9464: [Rust] [DataFusion] Remove Partition trait
d1d85db is described below

commit d1d85db24d81fe8c74a0af39242642c9091567af
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Aug 23 09:06:35 2020 -0600

    ARROW-9464: [Rust] [DataFusion] Remove Partition trait
    
    This follows on from https://github.com/apache/arrow/pull/8028.
    
    - Removes `Partition` trait, which was really redundant.
    - `ExecutonPlan.execute()` now takes a partition index.
    - Introduced `Partitioning` enum so that execution plans can describe their partitioning scheme and number of partitions. Currently, it just has `UnknownPartitioning` but later we can add others, such as `HashPartitioning`.
    - Removed `DatasourceExec` since no longer needed now that `TableProvider.scan` returns an `ExecutionPlan` directly.
    
    This is a step towards extracting the threading model out of the operators like `MergeExec` and having the operators be able to work with different threading models.
    
    Closes #8029 from andygrove/execute-partition
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/benchmarks/src/bin/nyctaxi.rs                 |   2 +-
 rust/benchmarks/src/bin/tpch.rs                    |   2 +-
 rust/datafusion/examples/flight_server.rs          |   2 +-
 rust/datafusion/src/datasource/csv.rs              |  22 ++--
 rust/datafusion/src/datasource/memory.rs           |  14 ++-
 rust/datafusion/src/datasource/parquet.rs          |   6 +-
 rust/datafusion/src/execution/context.rs           |  84 +++++++--------
 rust/datafusion/src/execution/dataframe_impl.rs    |   2 +-
 rust/datafusion/src/execution/physical_plan/csv.rs | 115 ++++++---------------
 .../src/execution/physical_plan/datasource.rs      |  59 -----------
 .../src/execution/physical_plan/explain.rs         |  26 ++---
 .../src/execution/physical_plan/filter.rs          |  49 ++-------
 .../src/execution/physical_plan/hash_aggregate.rs  |  72 +++----------
 .../src/execution/physical_plan/limit.rs           |  89 +++++++---------
 .../src/execution/physical_plan/memory.rs          |  67 +++---------
 .../src/execution/physical_plan/merge.rs           |  95 ++++++++---------
 rust/datafusion/src/execution/physical_plan/mod.rs |  31 ++++--
 .../src/execution/physical_plan/parquet.rs         | 110 ++++++--------------
 .../src/execution/physical_plan/planner.rs         |  31 +-----
 .../src/execution/physical_plan/projection.rs      |  45 ++------
 .../datafusion/src/execution/physical_plan/sort.rs |  54 ++++------
 rust/datafusion/src/test/mod.rs                    |   2 +-
 rust/datafusion/tests/sql.rs                       |   6 +-
 23 files changed, 327 insertions(+), 658 deletions(-)

diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 7468400..d45de71 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -115,7 +115,7 @@ fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()>
         println!("Optimized logical plan:\n{:?}", plan);
     }
     let physical_plan = ctx.create_physical_plan(&plan)?;
-    let result = ctx.collect(physical_plan.as_ref())?;
+    let result = ctx.collect(physical_plan)?;
     if debug {
         pretty::print_batches(&result)?;
     }
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index 49bcf32..4e27daf 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -148,7 +148,7 @@ fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()>
         println!("Optimized logical plan:\n{:?}", plan);
     }
     let physical_plan = ctx.create_physical_plan(&plan)?;
-    let result = ctx.collect(physical_plan.as_ref())?;
+    let result = ctx.collect(physical_plan)?;
     if debug {
         pretty::print_batches(&result)?;
     }
diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs
index 0cb2858..f5d6410 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -103,7 +103,7 @@ impl FlightService for FlightServiceImpl {
                     .map_err(|e| to_tonic_err(&e))?;
 
                 // execute the query
-                let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;
+                let results = ctx.collect(plan.clone()).map_err(|e| to_tonic_err(&e))?;
                 if results.is_empty() {
                     return Err(Status::internal("There were no results from ticket"));
                 }
diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs
index 1ce7658..6570680 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -43,14 +43,15 @@ use std::string::String;
 use std::sync::Arc;
 
 use crate::datasource::TableProvider;
-use crate::error::Result;
+use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::csv::CsvExec;
 pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, ExecutionPlan};
 
 /// Represents a CSV file with a provided schema
 pub struct CsvFile {
-    filename: String,
+    /// Path to a single CSV file or a directory containing one of more CSV files
+    path: String,
     schema: SchemaRef,
     has_header: bool,
     delimiter: u8,
@@ -59,14 +60,21 @@ pub struct CsvFile {
 
 impl CsvFile {
     /// Attempt to initialize a new `CsvFile` from a file path
-    pub fn try_new(filename: &str, options: CsvReadOptions) -> Result<Self> {
+    pub fn try_new(path: &str, options: CsvReadOptions) -> Result<Self> {
         let schema = Arc::new(match options.schema {
             Some(s) => s.clone(),
-            None => CsvExec::try_infer_schema(filename, &options)?,
+            None => {
+                let mut filenames: Vec<String> = vec![];
+                common::build_file_list(path, &mut filenames, options.file_extension)?;
+                if filenames.is_empty() {
+                    return Err(ExecutionError::General("No files found".to_string()));
+                }
+                CsvExec::try_infer_schema(&filenames, &options)?
+            }
         });
 
         Ok(Self {
-            filename: String::from(filename),
+            path: String::from(path),
             schema,
             has_header: options.has_header,
             delimiter: options.delimiter,
@@ -86,7 +94,7 @@ impl TableProvider for CsvFile {
         batch_size: usize,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(CsvExec::try_new(
-            &self.filename,
+            &self.path,
             CsvReadOptions::new()
                 .schema(&self.schema)
                 .has_header(self.has_header)
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 96eebca..ff56861 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -58,11 +58,11 @@ impl MemTable {
     pub fn load(t: &dyn TableProvider) -> Result<Self> {
         let schema = t.schema();
         let exec = t.scan(&None, 1024 * 1024)?;
-        let partitions = exec.partitions()?;
 
-        let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
-        for partition in &partitions {
-            let it = partition.execute()?;
+        let mut data: Vec<Vec<RecordBatch>> =
+            Vec::with_capacity(exec.output_partitioning().partition_count());
+        for partition in 0..exec.output_partitioning().partition_count() {
+            let it = exec.execute(partition)?;
             let mut it = it.lock().unwrap();
             let mut partition_batches = vec![];
             while let Ok(Some(batch)) = it.next_batch() {
@@ -147,8 +147,7 @@ mod tests {
 
         // scan with projection
         let exec = provider.scan(&Some(vec![2, 1]), 1024)?;
-        let partitions = exec.partitions()?;
-        let it = partitions[0].execute()?;
+        let it = exec.execute(0)?;
         let batch2 = it.lock().expect("mutex lock").next_batch()?.unwrap();
         assert_eq!(2, batch2.schema().fields().len());
         assert_eq!("c", batch2.schema().field(0).name());
@@ -178,8 +177,7 @@ mod tests {
         let provider = MemTable::new(schema, vec![vec![batch]])?;
 
         let exec = provider.scan(&None, 1024)?;
-        let partitions = exec.partitions()?;
-        let it = partitions[0].execute()?;
+        let it = exec.execute(0)?;
         let batch1 = it.lock().expect("mutex lock").next_batch()?.unwrap();
         assert_eq!(3, batch1.schema().fields().len());
         assert_eq!(3, batch1.num_columns());
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index 776a515..e2cd847 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -81,8 +81,7 @@ mod tests {
         let table = load_table("alltypes_plain.parquet")?;
         let projection = None;
         let exec = table.scan(&projection, 2)?;
-        let partitions = exec.partitions()?;
-        let it = partitions[0].execute()?;
+        let it = exec.execute(0)?;
         let mut it = it.lock().unwrap();
 
         let mut count = 0;
@@ -302,8 +301,7 @@ mod tests {
         projection: &Option<Vec<usize>>,
     ) -> Result<RecordBatch> {
         let exec = table.scan(projection, 1024)?;
-        let partitions = exec.partitions()?;
-        let it = partitions[0].execute()?;
+        let it = exec.execute(0)?;
         let mut it = it.lock().expect("failed to lock mutex");
         Ok(it
             .next_batch()?
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index ca87c2e..c6e8c05 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -350,72 +350,64 @@ impl ExecutionContext {
     }
 
     /// Execute a physical plan and collect the results in memory
-    pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
-        let partitions = plan.partitions()?;
-
-        match partitions.len() {
+    pub fn collect(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
+        match plan.output_partitioning().partition_count() {
             0 => Ok(vec![]),
             1 => {
-                let it = partitions[0].execute()?;
+                let it = plan.execute(0)?;
                 common::collect(it)
             }
             _ => {
                 // merge into a single partition
                 let plan = MergeExec::new(
                     plan.schema().clone(),
-                    partitions,
+                    plan.clone(),
                     self.state
                         .lock()
                         .expect("failed to lock mutex")
                         .config
                         .concurrency,
                 );
-                let partitions = plan.partitions()?;
-                if partitions.len() == 1 {
-                    common::collect(partitions[0].execute()?)
-                } else {
-                    Err(ExecutionError::InternalError(format!(
-                        "MergeExec returned {} partitions",
-                        partitions.len()
-                    )))
-                }
+                // MergeExec must produce a single partition
+                assert_eq!(1, plan.output_partitioning().partition_count());
+                common::collect(plan.execute(0)?)
             }
         }
     }
 
     /// Execute a query and write the results to a partitioned CSV file
-    pub fn write_csv(&self, plan: &dyn ExecutionPlan, path: &str) -> Result<()> {
+    pub fn write_csv(&self, plan: Arc<dyn ExecutionPlan>, path: &str) -> Result<()> {
         // create directory to contain the CSV files (one per partition)
         let path = path.to_string();
         fs::create_dir(&path)?;
 
-        let threads: Vec<JoinHandle<Result<()>>> = plan
-            .partitions()?
-            .iter()
-            .enumerate()
-            .map(|(i, p)| {
-                let p = p.clone();
-                let path = path.clone();
-                thread::spawn(move || {
-                    let filename = format!("part-{}.csv", i);
-                    let path = Path::new(&path).join(&filename);
-                    let file = fs::File::create(path)?;
-                    let mut writer = csv::Writer::new(file);
-                    let reader = p.execute()?;
-                    let mut reader = reader.lock().unwrap();
-                    loop {
-                        match reader.next_batch() {
-                            Ok(Some(batch)) => {
-                                writer.write(&batch)?;
+        let threads: Vec<JoinHandle<Result<()>>> =
+            (0..plan.output_partitioning().partition_count())
+                .enumerate()
+                .map(|(i, p)| {
+                    let p = p.clone();
+                    let path = path.clone();
+                    let plan = plan.clone();
+                    thread::spawn(move || {
+                        let filename = format!("part-{}.csv", i);
+                        let path = Path::new(&path).join(&filename);
+                        let file = fs::File::create(path)?;
+                        let mut writer = csv::Writer::new(file);
+                        let reader = plan.execute(p)?;
+                        let mut reader = reader.lock().unwrap();
+                        loop {
+                            match reader.next_batch() {
+                                Ok(Some(batch)) => {
+                                    writer.write(&batch)?;
+                                }
+                                Ok(None) => break,
+                                Err(e) => return Err(ExecutionError::from(e)),
                             }
-                            Ok(None) => break,
-                            Err(e) => return Err(ExecutionError::from(e)),
                         }
-                    }
-                    Ok(())
+                        Ok(())
+                    })
                 })
-            })
-            .collect();
+                .collect();
 
         // combine the results from each thread
         for thread in threads {
@@ -547,7 +539,7 @@ mod tests {
 
         let physical_plan = ctx.create_physical_plan(&logical_plan)?;
 
-        let results = ctx.collect(physical_plan.as_ref())?;
+        let results = ctx.collect(physical_plan)?;
 
         // there should be one batch per partition
         assert_eq!(results.len(), partition_count);
@@ -594,7 +586,7 @@ mod tests {
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
 
-        let batches = ctx.collect(physical_plan.as_ref())?;
+        let batches = ctx.collect(physical_plan)?;
         assert_eq!(4, batches.len());
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(10, batches[0].num_rows());
@@ -680,7 +672,7 @@ mod tests {
         assert_eq!(1, physical_plan.schema().fields().len());
         assert_eq!("b", physical_plan.schema().field(0).name().as_str());
 
-        let batches = ctx.collect(physical_plan.as_ref())?;
+        let batches = ctx.collect(physical_plan)?;
         assert_eq!(1, batches.len());
         assert_eq!(1, batches[0].num_columns());
         assert_eq!(4, batches[0].num_rows());
@@ -1048,7 +1040,7 @@ mod tests {
 
         let plan = ctx.optimize(&plan)?;
         let plan = ctx.create_physical_plan(&plan)?;
-        let result = ctx.collect(plan.as_ref())?;
+        let result = ctx.collect(plan)?;
 
         let batch = &result[0];
         assert_eq!(3, batch.num_columns());
@@ -1110,7 +1102,7 @@ mod tests {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
         let physical_plan = ctx.create_physical_plan(&logical_plan)?;
-        ctx.collect(physical_plan.as_ref())
+        ctx.collect(physical_plan)
     }
 
     fn field_names(result: &RecordBatch) -> Vec<String> {
@@ -1134,7 +1126,7 @@ mod tests {
         let logical_plan = ctx.create_logical_plan(sql)?;
         let logical_plan = ctx.optimize(&logical_plan)?;
         let physical_plan = ctx.create_physical_plan(&logical_plan)?;
-        ctx.write_csv(physical_plan.as_ref(), out_dir)
+        ctx.write_csv(physical_plan, out_dir)
     }
 
     /// Generate CSV partitions within the supplied directory
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs
index d53fb38..edaf7ae 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -110,7 +110,7 @@ impl DataFrame for DataFrameImpl {
         let ctx = ExecutionContext::from(self.ctx_state.clone());
         let plan = ctx.optimize(&self.plan)?;
         let plan = ctx.create_physical_plan(&plan)?;
-        Ok(ctx.collect(plan.as_ref())?)
+        Ok(ctx.collect(plan)?)
     }
 
     /// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs
index c53b21a..ca35254 100644
--- a/rust/datafusion/src/execution/physical_plan/csv.rs
+++ b/rust/datafusion/src/execution/physical_plan/csv.rs
@@ -21,8 +21,8 @@ use std::fs::File;
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, Partitioning};
 use arrow::csv;
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
@@ -107,6 +107,8 @@ impl<'a> CsvReadOptions<'a> {
 pub struct CsvExec {
     /// Path to directory containing partitioned CSV files with the same schema
     path: String,
+    /// The individual files under path
+    filenames: Vec<String>,
     /// Schema representing the CSV file
     schema: SchemaRef,
     /// Does the CSV file have a header?
@@ -131,9 +133,17 @@ impl CsvExec {
         projection: Option<Vec<usize>>,
         batch_size: usize,
     ) -> Result<Self> {
+        let file_extension = String::from(options.file_extension);
+
+        let mut filenames: Vec<String> = vec![];
+        common::build_file_list(path, &mut filenames, file_extension.as_str())?;
+        if filenames.is_empty() {
+            return Err(ExecutionError::General("No files found".to_string()));
+        }
+
         let schema = match options.schema {
             Some(s) => s.clone(),
-            None => CsvExec::try_infer_schema(path, &options)?,
+            None => CsvExec::try_infer_schema(&filenames, &options)?,
         };
 
         let projected_schema = match &projection {
@@ -143,10 +153,11 @@ impl CsvExec {
 
         Ok(Self {
             path: path.to_string(),
+            filenames,
             schema: Arc::new(schema),
             has_header: options.has_header,
             delimiter: Some(options.delimiter),
-            file_extension: String::from(options.file_extension),
+            file_extension,
             projection,
             projected_schema: Arc::new(projected_schema),
             batch_size,
@@ -154,16 +165,12 @@ impl CsvExec {
     }
 
     /// Infer schema for given CSV dataset
-    pub fn try_infer_schema(path: &str, options: &CsvReadOptions) -> Result<Schema> {
-        let mut filenames: Vec<String> = vec![];
-        common::build_file_list(path, &mut filenames, options.file_extension)?;
-
-        if filenames.is_empty() {
-            return Err(ExecutionError::General("No files found".to_string()));
-        }
-
+    pub fn try_infer_schema(
+        filenames: &[String],
+        options: &CsvReadOptions,
+    ) -> Result<Schema> {
         Ok(csv::infer_schema_from_files(
-            &filenames,
+            filenames,
             options.delimiter,
             Some(options.schema_infer_max_records),
             options.has_header,
@@ -177,73 +184,17 @@ impl ExecutionPlan for CsvExec {
         self.projected_schema.clone()
     }
 
-    /// Get the partitions for this execution plan. Each partition can be executed in parallel.
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        let mut filenames: Vec<String> = vec![];
-        common::build_file_list(
-            &self.path,
-            &mut filenames,
-            self.file_extension.as_str(),
-        )?;
-        let partitions = filenames
-            .iter()
-            .map(|filename| {
-                Arc::new(CsvPartition::new(
-                    &filename,
-                    self.schema.clone(),
-                    self.has_header,
-                    self.delimiter,
-                    self.projection.clone(),
-                    self.batch_size,
-                )) as Arc<dyn Partition>
-            })
-            .collect();
-        Ok(partitions)
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.filenames.len())
     }
-}
-
-/// CSV Partition
-#[derive(Debug)]
-struct CsvPartition {
-    /// Path to the CSV File
-    path: String,
-    /// Schema representing the CSV file
-    schema: SchemaRef,
-    /// Does the CSV file have a header?
-    has_header: bool,
-    /// An optional column delimiter. Defaults to `b','`
-    delimiter: Option<u8>,
-    /// Optional projection for which columns to load
-    projection: Option<Vec<usize>>,
-    /// Batch size
-    batch_size: usize,
-}
-
-impl CsvPartition {
-    fn new(
-        path: &str,
-        schema: SchemaRef,
-        has_header: bool,
-        delimiter: Option<u8>,
-        projection: Option<Vec<usize>>,
-        batch_size: usize,
-    ) -> Self {
-        Self {
-            path: path.to_string(),
-            schema,
-            has_header,
-            delimiter,
-            projection,
-            batch_size,
-        }
-    }
-}
 
-impl Partition for CsvPartition {
-    /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         Ok(Arc::new(Mutex::new(CsvIterator::try_new(
-            &self.path,
+            &self.filenames[partition],
             self.schema.clone(),
             self.has_header,
             self.delimiter,
@@ -315,9 +266,8 @@ mod tests {
         assert_eq!(13, csv.schema.fields().len());
         assert_eq!(3, csv.projected_schema.fields().len());
         assert_eq!(3, csv.schema().fields().len());
-        let partitions = csv.partitions()?;
-        let results = partitions[0].execute()?;
-        let mut it = results.lock().unwrap();
+        let it = csv.execute(0)?;
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch()?.unwrap();
         assert_eq!(3, batch.num_columns());
         let batch_schema = batch.schema();
@@ -339,9 +289,8 @@ mod tests {
         assert_eq!(13, csv.schema.fields().len());
         assert_eq!(13, csv.projected_schema.fields().len());
         assert_eq!(13, csv.schema().fields().len());
-        let partitions = csv.partitions()?;
-        let results = partitions[0].execute()?;
-        let mut it = results.lock().unwrap();
+        let it = csv.execute(0)?;
+        let mut it = it.lock().unwrap();
         let batch = it.next_batch()?.unwrap();
         assert_eq!(13, batch.num_columns());
         let batch_schema = batch.schema();
diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs
deleted file mode 100644
index 5a6fe31..0000000
--- a/rust/datafusion/src/execution/physical_plan/datasource.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! ExecutionPlan implementation for DataFusion data sources
-
-use std::{
-    fmt::{self, Debug, Formatter},
-    sync::Arc,
-};
-
-use crate::error::Result;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
-use arrow::datatypes::SchemaRef;
-
-/// Datasource execution plan
-pub struct DatasourceExec {
-    schema: SchemaRef,
-    partitions: Vec<Arc<dyn Partition>>,
-}
-
-impl Debug for DatasourceExec {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("DataSourceExec")
-            .field("schema", &self.schema)
-            .field("partitions.len", &self.partitions.len())
-            .finish()
-    }
-}
-
-impl DatasourceExec {
-    /// Create a new data source execution plan
-    pub fn new(schema: SchemaRef, partitions: Vec<Arc<dyn Partition>>) -> Self {
-        Self { schema, partitions }
-    }
-}
-
-impl ExecutionPlan for DatasourceExec {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(self.partitions.clone())
-    }
-}
diff --git a/rust/datafusion/src/execution/physical_plan/explain.rs b/rust/datafusion/src/execution/physical_plan/explain.rs
index c9213d5..b14fe5c 100644
--- a/rust/datafusion/src/execution/physical_plan/explain.rs
+++ b/rust/datafusion/src/execution/physical_plan/explain.rs
@@ -19,7 +19,7 @@
 
 use crate::error::Result;
 use crate::{
-    execution::physical_plan::{common::RecordBatchIterator, ExecutionPlan, Partition},
+    execution::physical_plan::{common::RecordBatchIterator, ExecutionPlan},
     logicalplan::StringifiedPlan,
 };
 use arrow::{
@@ -28,6 +28,7 @@ use arrow::{
     record_batch::{RecordBatch, RecordBatchReader},
 };
 
+use crate::execution::physical_plan::Partitioning;
 use std::sync::{Arc, Mutex};
 
 /// Explain execution plan operator. This operator contains the string
@@ -57,24 +58,17 @@ impl ExecutionPlan for ExplainExec {
         self.schema.clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![Arc::new(ExplainPartition {
-            schema: self.schema.clone(),
-            stringified_plans: self.stringified_plans.clone(),
-        })])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
     }
-}
 
-#[derive(Debug)]
-struct ExplainPartition {
-    /// Input schema
-    schema: SchemaRef,
-    /// The various plans that were created.
-    stringified_plans: Vec<StringifiedPlan>,
-}
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        assert_eq!(0, partition);
 
-impl Partition for ExplainPartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         let mut type_builder = StringArray::builder(self.stringified_plans.len());
         let mut plan_builder = StringArray::builder(self.stringified_plans.len());
 
diff --git a/rust/datafusion/src/execution/physical_plan/filter.rs b/rust/datafusion/src/execution/physical_plan/filter.rs
index fee7fe4..c5f29b5 100644
--- a/rust/datafusion/src/execution/physical_plan/filter.rs
+++ b/rust/datafusion/src/execution/physical_plan/filter.rs
@@ -21,7 +21,7 @@
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::{ExecutionPlan, Partition, PhysicalExpr};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
 use arrow::array::BooleanArray;
 use arrow::compute::filter;
 use arrow::datatypes::{DataType, SchemaRef};
@@ -64,46 +64,19 @@ impl ExecutionPlan for FilterExec {
         self.input.schema()
     }
 
-    /// Get the partitions for this execution plan
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        let partitions: Vec<Arc<dyn Partition>> = self
-            .input
-            .partitions()?
-            .iter()
-            .map(|p| {
-                let expr = self.predicate.clone();
-                let partition: Arc<dyn Partition> = Arc::new(FilterExecPartition {
-                    schema: self.input.schema(),
-                    predicate: expr,
-                    input: p.clone() as Arc<dyn Partition>,
-                });
-
-                partition
-            })
-            .collect();
-
-        Ok(partitions)
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        self.input.output_partitioning()
     }
-}
-
-/// Represents a single partition of a filter execution plan
-#[derive(Debug)]
-struct FilterExecPartition {
-    /// Output schema, which is the same as the input schema for this operator
-    schema: SchemaRef,
-    /// The expression to filter on. This expression must evaluate to a boolean value.
-    predicate: Arc<dyn PhysicalExpr>,
-    /// The input partition to filter.
-    input: Arc<dyn Partition>,
-}
 
-impl Partition for FilterExecPartition {
-    /// Execute the filter
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         Ok(Arc::new(Mutex::new(FilterExecIter {
-            schema: self.schema.clone(),
+            schema: self.input.schema().clone(),
             predicate: self.predicate.clone(),
-            input: self.input.execute()?,
+            input: self.input.execute(partition)?,
         })))
     }
 }
@@ -202,7 +175,7 @@ mod tests {
         let filter: Arc<dyn ExecutionPlan> =
             Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
 
-        let results = test::execute(filter.as_ref())?;
+        let results = test::execute(filter)?;
 
         results
             .iter()
diff --git a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
index 19038d6..0678121 100644
--- a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
@@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::{
-    Accumulator, AggregateExpr, ExecutionPlan, Partition, PhysicalExpr,
+    Accumulator, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
 };
 
 use arrow::array::{
@@ -111,65 +111,28 @@ impl ExecutionPlan for HashAggregateExec {
         self.schema.clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(self
-            .input
-            .partitions()?
-            .iter()
-            .map(|p| {
-                let aggregate: Arc<dyn Partition> =
-                    Arc::new(HashAggregatePartition::new(
-                        self.group_expr.clone(),
-                        self.aggr_expr.clone(),
-                        p.clone() as Arc<dyn Partition>,
-                        self.schema.clone(),
-                    ));
-
-                aggregate
-            })
-            .collect::<Vec<Arc<dyn Partition>>>())
-    }
-}
-
-#[derive(Debug)]
-struct HashAggregatePartition {
-    group_expr: Vec<Arc<dyn PhysicalExpr>>,
-    aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    input: Arc<dyn Partition>,
-    schema: SchemaRef,
-}
-
-impl HashAggregatePartition {
-    /// Create a new HashAggregatePartition
-    pub fn new(
-        group_expr: Vec<Arc<dyn PhysicalExpr>>,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-        input: Arc<dyn Partition>,
-        schema: SchemaRef,
-    ) -> Self {
-        HashAggregatePartition {
-            group_expr,
-            aggr_expr,
-            input,
-            schema,
-        }
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        self.input.output_partitioning()
     }
-}
 
-impl Partition for HashAggregatePartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        let input = self.input.execute(partition)?;
         if self.group_expr.is_empty() {
             Ok(Arc::new(Mutex::new(HashAggregateIterator::new(
                 self.schema.clone(),
                 self.aggr_expr.clone(),
-                self.input.execute()?,
+                input,
             ))))
         } else {
             Ok(Arc::new(Mutex::new(GroupedHashAggregateIterator::new(
                 self.schema.clone(),
                 self.group_expr.clone(),
                 self.aggr_expr.clone(),
-                self.input.execute()?,
+                input,
             ))))
         }
     }
@@ -726,14 +689,13 @@ mod tests {
         let aggregates: Vec<(Arc<dyn AggregateExpr>, String)> =
             vec![(sum(col("c4")), "SUM(c4)".to_string())];
 
-        let partition_aggregate = HashAggregateExec::try_new(
+        let partition_aggregate = Arc::new(HashAggregateExec::try_new(
             groups.clone(),
             aggregates.clone(),
             Arc::new(csv),
-        )?;
+        )?);
 
         let schema = partition_aggregate.schema();
-        let partitions = partition_aggregate.partitions()?;
 
         // construct the expressions for the final aggregation
         let (final_group, final_aggr) = partition_aggregate.make_final_expr(
@@ -741,9 +703,9 @@ mod tests {
             aggregates.iter().map(|x| x.1.clone()).collect(),
         );
 
-        let merge = Arc::new(MergeExec::new(schema.clone(), partitions, 2));
+        let merge = Arc::new(MergeExec::new(schema.clone(), partition_aggregate, 2));
 
-        let merged_aggregate = HashAggregateExec::try_new(
+        let merged_aggregate = Arc::new(HashAggregateExec::try_new(
             final_group
                 .iter()
                 .enumerate()
@@ -755,9 +717,9 @@ mod tests {
                 .map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
                 .collect(),
             merge,
-        )?;
+        )?);
 
-        let result = test::execute(&merged_aggregate)?;
+        let result = test::execute(merged_aggregate)?;
         assert_eq!(result.len(), 1);
 
         let batch = &result[0];
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs b/rust/datafusion/src/execution/physical_plan/limit.rs
index 6d1cffd..862ac24 100644
--- a/rust/datafusion/src/execution/physical_plan/limit.rs
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -23,8 +23,7 @@ use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::common::{self, RecordBatchIterator};
 use crate::execution::physical_plan::memory::MemoryIterator;
 use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::ExecutionPlan;
-use crate::execution::physical_plan::Partition;
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning};
 use arrow::array::ArrayRef;
 use arrow::compute::limit;
 use arrow::datatypes::SchemaRef;
@@ -36,7 +35,7 @@ pub struct GlobalLimitExec {
     /// Input schema
     schema: SchemaRef,
     /// Input partitions
-    partitions: Vec<Arc<dyn Partition>>,
+    input: Arc<dyn ExecutionPlan>,
     /// Maximum number of rows to return
     limit: usize,
     /// Number of threads to run parallel LocalLimitExec on
@@ -47,13 +46,13 @@ impl GlobalLimitExec {
     /// Create a new MergeExec
     pub fn new(
         schema: SchemaRef,
-        partitions: Vec<Arc<dyn Partition>>,
+        input: Arc<dyn ExecutionPlan>,
         limit: usize,
         concurrency: usize,
     ) -> Self {
         GlobalLimitExec {
             schema,
-            partitions,
+            input,
             limit,
             concurrency,
         }
@@ -65,49 +64,30 @@ impl ExecutionPlan for GlobalLimitExec {
         self.schema.clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![Arc::new(LimitPartition {
-            schema: self.schema.clone(),
-            partitions: self.partitions.clone(),
-            limit: self.limit,
-            concurrency: self.concurrency,
-        })])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
     }
-}
 
-#[derive(Debug)]
-struct LimitPartition {
-    /// Input schema
-    schema: SchemaRef,
-    /// Input partitions
-    partitions: Vec<Arc<dyn Partition>>,
-    /// Maximum number of rows to return
-    limit: usize,
-    /// Number of threads to run parallel LocalLimitExec on
-    concurrency: usize,
-}
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        // GlobalLimitExec has a single partition
+        assert_eq!(0, partition);
 
-impl Partition for LimitPartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         // apply limit in parallel across all input partitions
-        let local_limit = self
-            .partitions
-            .iter()
-            .map(|p| {
-                Arc::new(LocalLimitExec::new(
-                    p.clone(),
-                    self.schema.clone(),
-                    self.limit,
-                )) as Arc<dyn Partition>
-            })
-            .collect();
+        let local_limit = Arc::new(LocalLimitExec::new(
+            self.input.clone(),
+            self.schema.clone(),
+            self.limit,
+        ));
 
         // limit needs to collapse inputs down to a single partition
         let merge = MergeExec::new(self.schema.clone(), local_limit, self.concurrency);
-        let merge_partitions = merge.partitions()?;
         // MergeExec must always produce a single partition
-        assert_eq!(1, merge_partitions.len());
-        let it = merge_partitions[0].execute()?;
+        assert_eq!(1, merge.output_partitioning().partition_count());
+        let it = merge.execute(0)?;
         let batches = common::collect(it)?;
 
         // apply the limit to the output
@@ -138,14 +118,14 @@ impl Partition for LimitPartition {
 /// LocalLimitExec applies a limit so a single partition
 #[derive(Debug)]
 pub struct LocalLimitExec {
-    input: Arc<dyn Partition>,
+    input: Arc<dyn ExecutionPlan>,
     schema: SchemaRef,
     limit: usize,
 }
 
 impl LocalLimitExec {
     /// Create a new LocalLimitExec partition
-    pub fn new(input: Arc<dyn Partition>, schema: SchemaRef, limit: usize) -> Self {
+    pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef, limit: usize) -> Self {
         Self {
             input,
             schema,
@@ -154,9 +134,20 @@ impl LocalLimitExec {
     }
 }
 
-impl Partition for LocalLimitExec {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        let it = self.input.execute()?;
+impl ExecutionPlan for LocalLimitExec {
+    fn schema(&self) -> SchemaRef {
+        self.input.schema()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        self.input.output_partitioning()
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        let it = self.input.execute(partition)?;
         Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
             collect_with_limit(it, self.limit)?,
             self.schema.clone(),
@@ -230,14 +221,12 @@ mod tests {
             CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
 
         // input should have 4 partitions
-        let input = csv.partitions()?;
-        assert_eq!(input.len(), num_partitions);
+        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
 
-        let limit = GlobalLimitExec::new(schema.clone(), input, 7, 2);
-        let partitions = limit.partitions()?;
+        let limit = GlobalLimitExec::new(schema.clone(), Arc::new(csv), 7, 2);
 
         // the result should contain 4 batches (one per input partition)
-        let iter = partitions[0].execute()?;
+        let iter = limit.execute(0)?;
         let batches = common::collect(iter)?;
 
         // there should be a total of 100 rows
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
index f9b4441..096cecb 100644
--- a/rust/datafusion/src/execution/physical_plan/memory.rs
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -20,7 +20,7 @@
 use std::sync::{Arc, Mutex};
 
 use crate::error::Result;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning};
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -42,20 +42,20 @@ impl ExecutionPlan for MemoryExec {
         self.schema.clone()
     }
 
-    /// Get the partitions for this execution plan. Each partition can be executed in parallel.
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        let partitions = self
-            .partitions
-            .iter()
-            .map(|vec| {
-                Arc::new(MemoryPartition::new(
-                    vec.clone(),
-                    self.schema.clone(),
-                    self.projection.clone(),
-                )) as Arc<dyn Partition>
-            })
-            .collect();
-        Ok(partitions)
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.partitions.len())
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+            self.partitions[partition].clone(),
+            self.schema.clone(),
+            self.projection.clone(),
+        )?)))
     }
 }
 
@@ -74,43 +74,6 @@ impl MemoryExec {
     }
 }
 
-/// Memory partition
-#[derive(Debug)]
-struct MemoryPartition {
-    /// Vector of record batches
-    data: Vec<RecordBatch>,
-    /// Schema representing the data
-    schema: SchemaRef,
-    /// Optional projection
-    projection: Option<Vec<usize>>,
-}
-
-impl MemoryPartition {
-    /// Create a new in-memory partition
-    fn new(
-        data: Vec<RecordBatch>,
-        schema: SchemaRef,
-        projection: Option<Vec<usize>>,
-    ) -> Self {
-        Self {
-            data,
-            schema,
-            projection,
-        }
-    }
-}
-
-impl Partition for MemoryPartition {
-    /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
-            self.data.clone(),
-            self.schema.clone(),
-            self.projection.clone(),
-        )?)))
-    }
-}
-
 /// Iterator over batches
 pub(crate) struct MemoryIterator {
     /// Vector of record batches
diff --git a/rust/datafusion/src/execution/physical_plan/merge.rs b/rust/datafusion/src/execution/physical_plan/merge.rs
index 97614bd..9d36d6a 100644
--- a/rust/datafusion/src/execution/physical_plan/merge.rs
+++ b/rust/datafusion/src/execution/physical_plan/merge.rs
@@ -23,7 +23,7 @@ use std::thread::{self, JoinHandle};
 
 use crate::error::{ExecutionError, Result};
 use crate::execution::physical_plan::common::RecordBatchIterator;
-use crate::execution::physical_plan::Partition;
+use crate::execution::physical_plan::Partitioning;
 use crate::execution::physical_plan::{common, ExecutionPlan};
 
 use arrow::datatypes::SchemaRef;
@@ -36,7 +36,7 @@ pub struct MergeExec {
     /// Input schema
     schema: SchemaRef,
     /// Input partitions
-    partitions: Vec<Arc<dyn Partition>>,
+    input: Arc<dyn ExecutionPlan>,
     /// Maximum number of concurrent threads
     concurrency: usize,
 }
@@ -45,12 +45,12 @@ impl MergeExec {
     /// Create a new MergeExec
     pub fn new(
         schema: SchemaRef,
-        partitions: Vec<Arc<dyn Partition>>,
+        input: Arc<dyn ExecutionPlan>,
         max_concurrency: usize,
     ) -> Self {
         MergeExec {
             schema,
-            partitions,
+            input,
             concurrency: max_concurrency,
         }
     }
@@ -61,64 +61,39 @@ impl ExecutionPlan for MergeExec {
         self.schema.clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![Arc::new(MergePartition {
-            schema: self.schema.clone(),
-            partitions: self.partitions.clone(),
-            concurrency: self.concurrency,
-        })])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
     }
-}
 
-#[derive(Debug)]
-struct MergePartition {
-    /// Input schema
-    schema: SchemaRef,
-    /// Input partitions
-    partitions: Vec<Arc<dyn Partition>>,
-    /// Maximum number of concurrent threads
-    concurrency: usize,
-}
-
-fn collect_from_thread(
-    thread: JoinHandle<Result<Vec<RecordBatch>>>,
-    combined_results: &mut Vec<Arc<RecordBatch>>,
-) -> Result<()> {
-    match thread.join() {
-        Ok(join) => {
-            join?
-                .iter()
-                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
-            Ok(())
-        }
-        Err(e) => Err(ExecutionError::General(format!(
-            "Error collecting batches from thread: {:?}",
-            e
-        ))),
-    }
-}
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        // MergeExec produces a single partition
+        assert_eq!(0, partition);
 
-impl Partition for MergePartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        match self.partitions.len() {
+        let input_partitions = self.input.output_partitioning().partition_count();
+        match input_partitions {
             0 => Err(ExecutionError::General(
                 "MergeExec requires at least one input partition".to_owned(),
             )),
             1 => {
                 // bypass any threading if there is a single partition
-                self.partitions[0].execute()
+                self.input.execute(0)
             }
             _ => {
-                let partitions_per_thread =
-                    (self.partitions.len() / self.concurrency).max(1);
-                let chunks = self.partitions.chunks(partitions_per_thread);
+                let partitions_per_thread = (input_partitions / self.concurrency).max(1);
+                let range: Vec<usize> = (0..input_partitions).collect();
+                let chunks = range.chunks(partitions_per_thread);
                 let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = chunks
                     .map(|chunk| {
                         let chunk = chunk.to_vec();
+                        let input = self.input.clone();
                         thread::spawn(move || {
                             let mut batches = vec![];
                             for partition in chunk {
-                                let it = partition.execute()?;
+                                let it = input.execute(partition)?;
                                 common::collect(it).iter().for_each(|b| {
                                     b.iter().for_each(|b| batches.push(b.clone()))
                                 });
@@ -143,6 +118,24 @@ impl Partition for MergePartition {
     }
 }
 
+fn collect_from_thread(
+    thread: JoinHandle<Result<Vec<RecordBatch>>>,
+    combined_results: &mut Vec<Arc<RecordBatch>>,
+) -> Result<()> {
+    match thread.join() {
+        Ok(join) => {
+            join?
+                .iter()
+                .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+            Ok(())
+        }
+        Err(e) => Err(ExecutionError::General(format!(
+            "Error collecting batches from thread: {:?}",
+            e
+        ))),
+    }
+}
+
 #[cfg(test)]
 mod tests {
 
@@ -163,17 +156,15 @@ mod tests {
             CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
 
         // input should have 4 partitions
-        let input = csv.partitions()?;
-        assert_eq!(input.len(), num_partitions);
+        assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
 
-        let merge = MergeExec::new(schema.clone(), input, 2);
+        let merge = MergeExec::new(schema.clone(), Arc::new(csv), 2);
 
         // output of MergeExec should have a single partition
-        let merged = merge.partitions()?;
-        assert_eq!(merged.len(), 1);
+        assert_eq!(merge.output_partitioning().partition_count(), 1);
 
         // the result should contain 4 batches (one per input partition)
-        let iter = merged[0].execute()?;
+        let iter = merge.execute(0)?;
         let batches = common::collect(iter)?;
         assert_eq!(batches.len(), num_partitions);
 
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 817b6ab..80686f7 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -45,17 +45,33 @@ pub trait PhysicalPlanner {
 }
 
 /// Partition-aware execution plan for a relation
-pub trait ExecutionPlan: Debug {
+pub trait ExecutionPlan: Debug + Send + Sync {
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef;
-    /// Get the partitions for this execution plan. Each partition can be executed in parallel.
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>>;
+    /// Specifies the output partitioning scheme of this plan
+    fn output_partitioning(&self) -> Partitioning;
+    /// Execute one partition and return an iterator over RecordBatch
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>;
+}
+
+/// Partitioning schemes supported by operators.
+#[derive(Debug, Clone)]
+pub enum Partitioning {
+    /// Unknown partitioning scheme
+    UnknownPartitioning(usize),
 }
 
-/// Represents a partition of an execution plan that can be executed on a thread
-pub trait Partition: Send + Sync + Debug {
-    /// Execute this partition and return an iterator over RecordBatch
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>;
+impl Partitioning {
+    /// Returns the number of partitions in this partitioning scheme
+    pub fn partition_count(&self) -> usize {
+        use Partitioning::*;
+        match self {
+            UnknownPartitioning(n) => *n,
+        }
+    }
 }
 
 /// Expression that can be evaluated against a RecordBatch
@@ -109,7 +125,6 @@ pub fn scalar_functions() -> Vec<ScalarFunction> {
 
 pub mod common;
 pub mod csv;
-pub mod datasource;
 pub mod explain;
 pub mod expressions;
 pub mod filter;
diff --git a/rust/datafusion/src/execution/physical_plan/parquet.rs b/rust/datafusion/src/execution/physical_plan/parquet.rs
index 080cfff..2888560 100644
--- a/rust/datafusion/src/execution/physical_plan/parquet.rs
+++ b/rust/datafusion/src/execution/physical_plan/parquet.rs
@@ -23,15 +23,15 @@ use std::sync::{Arc, Mutex};
 use std::{fmt, thread};
 
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, Partitioning};
 use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
 use parquet::file::reader::SerializedFileReader;
 
 use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
-use fmt::{Debug, Formatter};
+use fmt::Debug;
 use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
 
 /// Execution plan for scanning a Parquet file
@@ -91,53 +91,38 @@ impl ExecutionPlan for ParquetExec {
         self.schema.clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        let partitions = self
-            .filenames
-            .iter()
-            .map(|filename| {
-                Arc::new(ParquetPartition::new(
-                    &filename,
-                    self.projection.clone(),
-                    self.schema.clone(),
-                    self.batch_size,
-                )) as Arc<dyn Partition>
-            })
-            .collect();
-        Ok(partitions)
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(self.filenames.len())
     }
-}
 
-struct ParquetPartition {
-    /// Filename for this partition
-    filename: String,
-    /// Projection for which columns to load
-    projection: Vec<usize>,
-    /// Batch size
-    batch_size: usize,
-    schema: SchemaRef,
-}
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        // because the parquet implementation is not thread-safe, it is necessary to execute
+        // on a thread and communicate with channels
+        let (response_tx, response_rx): (
+            Sender<ArrowResult<Option<RecordBatch>>>,
+            Receiver<ArrowResult<Option<RecordBatch>>>,
+        ) = bounded(2);
 
-impl Debug for ParquetPartition {
-    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
-        f.debug_struct("ParquetPartition").finish()
-    }
-}
+        let filename = self.filenames[partition].clone();
+        let projection = self.projection.clone();
+        let batch_size = self.batch_size;
 
-impl ParquetPartition {
-    /// Create a new Parquet partition
-    pub fn new(
-        filename: &str,
-        projection: Vec<usize>,
-        schema: SchemaRef,
-        batch_size: usize,
-    ) -> Self {
-        Self {
-            filename: filename.to_owned(),
-            projection,
-            schema,
-            batch_size,
-        }
+        thread::spawn(move || {
+            if let Err(e) = read_file(&filename, projection, batch_size, response_tx) {
+                println!("Parquet reader thread terminated due to error: {:?}", e);
+            }
+        });
+
+        let iterator = Arc::new(Mutex::new(ParquetIterator {
+            schema: self.schema.clone(),
+            response_rx,
+        }));
+
+        Ok(iterator)
     }
 }
 
@@ -186,34 +171,6 @@ fn read_file(
     Ok(())
 }
 
-impl Partition for ParquetPartition {
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
-        // because the parquet implementation is not thread-safe, it is necessary to execute
-        // on a thread and communicate with channels
-        let (response_tx, response_rx): (
-            Sender<ArrowResult<Option<RecordBatch>>>,
-            Receiver<ArrowResult<Option<RecordBatch>>>,
-        ) = bounded(2);
-
-        let filename = self.filename.clone();
-        let projection = self.projection.clone();
-        let batch_size = self.batch_size;
-
-        thread::spawn(move || {
-            if let Err(e) = read_file(&filename, projection, batch_size, response_tx) {
-                println!("Parquet reader thread terminated due to error: {:?}", e);
-            }
-        });
-
-        let iterator = Arc::new(Mutex::new(ParquetIterator {
-            schema: self.schema.clone(),
-            response_rx,
-        }));
-
-        Ok(iterator)
-    }
-}
-
 struct ParquetIterator {
     schema: SchemaRef,
     response_rx: Receiver<ArrowResult<Option<RecordBatch>>>,
@@ -244,10 +201,9 @@ mod tests {
             env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
         let filename = format!("{}/alltypes_plain.parquet", testdata);
         let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?;
-        let partitions = parquet_exec.partitions()?;
-        assert_eq!(partitions.len(), 1);
+        assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
-        let results = partitions[0].execute()?;
+        let results = parquet_exec.execute(0)?;
         let mut results = results.lock().unwrap();
         let batch = results.next_batch()?.unwrap();
 
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
index 3f2fe04..339e5fe 100644
--- a/rust/datafusion/src/execution/physical_plan/planner.rs
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -23,7 +23,6 @@ use super::expressions::binary;
 use crate::error::{ExecutionError, Result};
 use crate::execution::context::ExecutionContextState;
 use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
-use crate::execution::physical_plan::datasource::DatasourceExec;
 use crate::execution::physical_plan::explain::ExplainExec;
 use crate::execution::physical_plan::expressions;
 use crate::execution::physical_plan::expressions::{
@@ -71,27 +70,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 projection,
                 ..
             } => match ctx_state.datasources.get(table_name) {
-                Some(provider) => {
-                    let exec = provider.scan(projection, batch_size)?;
-                    let partitions = exec.partitions()?;
-                    if partitions.is_empty() {
-                        Err(ExecutionError::General(
-                            "Table provider returned no partitions".to_string(),
-                        ))
-                    } else {
-                        let schema = match projection {
-                            None => provider.schema().clone(),
-                            Some(p) => Arc::new(Schema::new(
-                                p.iter()
-                                    .map(|i| provider.schema().field(*i).clone())
-                                    .collect(),
-                            )),
-                        };
-
-                        let exec = DatasourceExec::new(schema, partitions.clone());
-                        Ok(Arc::new(exec))
-                    }
-                }
+                Some(provider) => provider.scan(projection, batch_size),
                 _ => Err(ExecutionError::General(format!(
                     "No table named {}",
                     table_name
@@ -180,15 +159,15 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
                 )?;
 
                 let schema = initial_aggr.schema();
-                let partitions = initial_aggr.partitions()?;
 
-                if partitions.len() == 1 {
+                if initial_aggr.output_partitioning().partition_count() == 1 {
                     return Ok(Arc::new(initial_aggr));
                 }
 
+                let initial_aggr = Arc::new(initial_aggr);
                 let merge = Arc::new(MergeExec::new(
                     schema.clone(),
-                    partitions,
+                    initial_aggr.clone(),
                     ctx_state.config.concurrency,
                 ));
 
@@ -261,7 +240,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
 
                 Ok(Arc::new(GlobalLimitExec::new(
                     input_schema.clone(),
-                    input.partitions()?,
+                    input,
                     *n,
                     ctx_state.config.concurrency,
                 )))
diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs
index a5ad0ef..ce51e89 100644
--- a/rust/datafusion/src/execution/physical_plan/projection.rs
+++ b/rust/datafusion/src/execution/physical_plan/projection.rs
@@ -23,7 +23,7 @@
 use std::sync::{Arc, Mutex};
 
 use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::{ExecutionPlan, Partition, PhysicalExpr};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -74,42 +74,19 @@ impl ExecutionPlan for ProjectionExec {
         self.schema.clone()
     }
 
-    /// Get the partitions for this execution plan
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        let partitions: Vec<Arc<dyn Partition>> = self
-            .input
-            .partitions()?
-            .iter()
-            .map(|p| {
-                let projection: Arc<dyn Partition> = Arc::new(ProjectionPartition {
-                    schema: self.schema.clone(),
-                    expr: self.expr.clone(),
-                    input: p.clone() as Arc<dyn Partition>,
-                });
-
-                projection
-            })
-            .collect();
-
-        Ok(partitions)
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        self.input.output_partitioning()
     }
-}
-
-/// Represents a single partition of a projection execution plan
-#[derive(Debug)]
-struct ProjectionPartition {
-    schema: SchemaRef,
-    expr: Vec<Arc<dyn PhysicalExpr>>,
-    input: Arc<dyn Partition>,
-}
 
-impl Partition for ProjectionPartition {
-    /// Execute the projection
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         Ok(Arc::new(Mutex::new(ProjectionIterator {
             schema: self.schema.clone(),
             expr: self.expr.clone(),
-            input: self.input.execute()?,
+            input: self.input.execute(partition)?,
         })))
     }
 }
@@ -168,9 +145,9 @@ mod tests {
 
         let mut partition_count = 0;
         let mut row_count = 0;
-        for partition in projection.partitions()? {
+        for partition in 0..projection.output_partitioning().partition_count() {
             partition_count += 1;
-            let iterator = partition.execute()?;
+            let iterator = projection.execute(partition)?;
             let mut iterator = iterator.lock().unwrap();
             while let Some(batch) = iterator.next_batch()? {
                 assert_eq!(1, batch.num_columns());
diff --git a/rust/datafusion/src/execution/physical_plan/sort.rs b/rust/datafusion/src/execution/physical_plan/sort.rs
index e86d6cc..c6d2031 100644
--- a/rust/datafusion/src/execution/physical_plan/sort.rs
+++ b/rust/datafusion/src/execution/physical_plan/sort.rs
@@ -29,7 +29,7 @@ use crate::error::Result;
 use crate::execution::physical_plan::common::RecordBatchIterator;
 use crate::execution::physical_plan::expressions::PhysicalSortExpr;
 use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::{common, ExecutionPlan, Partition};
+use crate::execution::physical_plan::{common, ExecutionPlan, Partitioning};
 
 /// Sort execution plan
 #[derive(Debug)]
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
         self.input.schema().clone()
     }
 
-    fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
-        Ok(vec![
-            (Arc::new(SortPartition {
-                input: self.input.partitions()?,
-                expr: self.expr.clone(),
-                schema: self.schema(),
-                concurrency: self.concurrency,
-            })),
-        ])
+    /// Get the output partitioning of this plan
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
     }
-}
 
-/// Represents a single partition of a Sort execution plan
-#[derive(Debug)]
-struct SortPartition {
-    schema: SchemaRef,
-    expr: Vec<PhysicalSortExpr>,
-    input: Vec<Arc<dyn Partition>>,
-    /// Number of threads to execute input partitions on before combining into a single partition
-    concurrency: usize,
-}
+    fn execute(
+        &self,
+        partition: usize,
+    ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+        assert_eq!(0, partition);
 
-impl Partition for SortPartition {
-    /// Execute the sort
-    fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
         // sort needs to operate on a single partition currently
-        let merge =
-            MergeExec::new(self.schema.clone(), self.input.clone(), self.concurrency);
-        let merge_partitions = merge.partitions()?;
+        let merge = MergeExec::new(self.schema(), self.input.clone(), self.concurrency);
         // MergeExec must always produce a single partition
-        assert_eq!(1, merge_partitions.len());
-        let it = merge_partitions[0].execute()?;
+        assert_eq!(1, merge.output_partitioning().partition_count());
+        let it = merge.execute(0)?;
         let batches = common::collect(it)?;
 
         // combine all record batches into one for each column
         let combined_batch = RecordBatch::try_new(
-            self.schema.clone(),
-            self.schema
+            self.schema(),
+            self.schema()
                 .fields()
                 .iter()
                 .enumerate()
@@ -124,7 +108,7 @@ impl Partition for SortPartition {
 
         // reorder all rows based on sorted indices
         let sorted_batch = RecordBatch::try_new(
-            self.schema.clone(),
+            self.schema(),
             combined_batch
                 .columns()
                 .iter()
@@ -143,7 +127,7 @@ impl Partition for SortPartition {
         )?;
 
         Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
-            self.schema.clone(),
+            self.schema(),
             vec![Arc::new(sorted_batch)],
         ))))
     }
@@ -166,7 +150,7 @@ mod tests {
         let csv =
             CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
 
-        let sort_exec = SortExec::try_new(
+        let sort_exec = Arc::new(SortExec::try_new(
             vec![
                 // c1 string column
                 PhysicalSortExpr {
@@ -186,9 +170,9 @@ mod tests {
             ],
             Arc::new(csv),
             2,
-        )?;
+        )?);
 
-        let result: Vec<RecordBatch> = test::execute(&sort_exec)?;
+        let result: Vec<RecordBatch> = test::execute(sort_exec)?;
         assert_eq!(result.len(), 1);
 
         let columns = result[0].columns();
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 26d67fe..10b034d 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -37,7 +37,7 @@ pub fn arrow_testdata_path() -> String {
 }
 
 /// Execute a physical plan and collect the results
-pub fn execute(plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
+pub fn execute(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
     let ctx = ExecutionContext::new();
     ctx.collect(plan)
 }
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index e9e09b4..43ac27a 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -110,7 +110,7 @@ fn parquet_single_nan_schema() {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let plan = ctx.create_physical_plan(&plan).unwrap();
-    let results = ctx.collect(plan.as_ref()).unwrap();
+    let results = ctx.collect(plan).unwrap();
     for batch in results {
         assert_eq!(1, batch.num_rows());
         assert_eq!(1, batch.num_columns());
@@ -289,7 +289,7 @@ fn csv_query_avg_multi_batch() -> Result<()> {
     let plan = ctx.create_logical_plan(&sql).unwrap();
     let plan = ctx.optimize(&plan).unwrap();
     let plan = ctx.create_physical_plan(&plan).unwrap();
-    let results = ctx.collect(plan.as_ref()).unwrap();
+    let results = ctx.collect(plan).unwrap();
     let batch = &results[0];
     let column = batch.column(0);
     let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
@@ -550,7 +550,7 @@ fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<String> {
     let optimized_logical_schema = plan.schema().clone();
     let plan = ctx.create_physical_plan(&plan).unwrap();
     let physical_schema = plan.schema().clone();
-    let results = ctx.collect(plan.as_ref()).unwrap();
+    let results = ctx.collect(plan).unwrap();
 
     assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
     assert_eq!(logical_schema.as_ref(), physical_schema.as_ref());