You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/08/23 15:07:11 UTC
[arrow] branch master updated: ARROW-9464: [Rust] [DataFusion]
Remove Partition trait
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d1d85db ARROW-9464: [Rust] [DataFusion] Remove Partition trait
d1d85db is described below
commit d1d85db24d81fe8c74a0af39242642c9091567af
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Aug 23 09:06:35 2020 -0600
ARROW-9464: [Rust] [DataFusion] Remove Partition trait
This follows on from https://github.com/apache/arrow/pull/8028.
- Removes `Partition` trait, which was really redundant.
- `ExecutonPlan.execute()` now takes a partition index.
- Introduced `Partitioning` enum so that execution plans can describe their partitioning scheme and number of partitions. Currently, it just has `UnknownPartitioning` but later we can add others, such as `HashPartitioning`.
- Removed `DatasourceExec` since no longer needed now that `TableProvider.scan` returns an `ExecutionPlan` directly.
This is a step towards extracting the threading model out of the operators like `MergeExec` and having the operators be able to work with different threading models.
Closes #8029 from andygrove/execute-partition
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/benchmarks/src/bin/nyctaxi.rs | 2 +-
rust/benchmarks/src/bin/tpch.rs | 2 +-
rust/datafusion/examples/flight_server.rs | 2 +-
rust/datafusion/src/datasource/csv.rs | 22 ++--
rust/datafusion/src/datasource/memory.rs | 14 ++-
rust/datafusion/src/datasource/parquet.rs | 6 +-
rust/datafusion/src/execution/context.rs | 84 +++++++--------
rust/datafusion/src/execution/dataframe_impl.rs | 2 +-
rust/datafusion/src/execution/physical_plan/csv.rs | 115 ++++++---------------
.../src/execution/physical_plan/datasource.rs | 59 -----------
.../src/execution/physical_plan/explain.rs | 26 ++---
.../src/execution/physical_plan/filter.rs | 49 ++-------
.../src/execution/physical_plan/hash_aggregate.rs | 72 +++----------
.../src/execution/physical_plan/limit.rs | 89 +++++++---------
.../src/execution/physical_plan/memory.rs | 67 +++---------
.../src/execution/physical_plan/merge.rs | 95 ++++++++---------
rust/datafusion/src/execution/physical_plan/mod.rs | 31 ++++--
.../src/execution/physical_plan/parquet.rs | 110 ++++++--------------
.../src/execution/physical_plan/planner.rs | 31 +-----
.../src/execution/physical_plan/projection.rs | 45 ++------
.../datafusion/src/execution/physical_plan/sort.rs | 54 ++++------
rust/datafusion/src/test/mod.rs | 2 +-
rust/datafusion/tests/sql.rs | 6 +-
23 files changed, 327 insertions(+), 658 deletions(-)
diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs
index 7468400..d45de71 100644
--- a/rust/benchmarks/src/bin/nyctaxi.rs
+++ b/rust/benchmarks/src/bin/nyctaxi.rs
@@ -115,7 +115,7 @@ fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()>
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(physical_plan.as_ref())?;
+ let result = ctx.collect(physical_plan)?;
if debug {
pretty::print_batches(&result)?;
}
diff --git a/rust/benchmarks/src/bin/tpch.rs b/rust/benchmarks/src/bin/tpch.rs
index 49bcf32..4e27daf 100644
--- a/rust/benchmarks/src/bin/tpch.rs
+++ b/rust/benchmarks/src/bin/tpch.rs
@@ -148,7 +148,7 @@ fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Result<()>
println!("Optimized logical plan:\n{:?}", plan);
}
let physical_plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(physical_plan.as_ref())?;
+ let result = ctx.collect(physical_plan)?;
if debug {
pretty::print_batches(&result)?;
}
diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs
index 0cb2858..f5d6410 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -103,7 +103,7 @@ impl FlightService for FlightServiceImpl {
.map_err(|e| to_tonic_err(&e))?;
// execute the query
- let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;
+ let results = ctx.collect(plan.clone()).map_err(|e| to_tonic_err(&e))?;
if results.is_empty() {
return Err(Status::internal("There were no results from ticket"));
}
diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs
index 1ce7658..6570680 100644
--- a/rust/datafusion/src/datasource/csv.rs
+++ b/rust/datafusion/src/datasource/csv.rs
@@ -43,14 +43,15 @@ use std::string::String;
use std::sync::Arc;
use crate::datasource::TableProvider;
-use crate::error::Result;
+use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::csv::CsvExec;
pub use crate::execution::physical_plan::csv::CsvReadOptions;
-use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, ExecutionPlan};
/// Represents a CSV file with a provided schema
pub struct CsvFile {
- filename: String,
+ /// Path to a single CSV file or a directory containing one of more CSV files
+ path: String,
schema: SchemaRef,
has_header: bool,
delimiter: u8,
@@ -59,14 +60,21 @@ pub struct CsvFile {
impl CsvFile {
/// Attempt to initialize a new `CsvFile` from a file path
- pub fn try_new(filename: &str, options: CsvReadOptions) -> Result<Self> {
+ pub fn try_new(path: &str, options: CsvReadOptions) -> Result<Self> {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
- None => CsvExec::try_infer_schema(filename, &options)?,
+ None => {
+ let mut filenames: Vec<String> = vec![];
+ common::build_file_list(path, &mut filenames, options.file_extension)?;
+ if filenames.is_empty() {
+ return Err(ExecutionError::General("No files found".to_string()));
+ }
+ CsvExec::try_infer_schema(&filenames, &options)?
+ }
});
Ok(Self {
- filename: String::from(filename),
+ path: String::from(path),
schema,
has_header: options.has_header,
delimiter: options.delimiter,
@@ -86,7 +94,7 @@ impl TableProvider for CsvFile {
batch_size: usize,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(CsvExec::try_new(
- &self.filename,
+ &self.path,
CsvReadOptions::new()
.schema(&self.schema)
.has_header(self.has_header)
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 96eebca..ff56861 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -58,11 +58,11 @@ impl MemTable {
pub fn load(t: &dyn TableProvider) -> Result<Self> {
let schema = t.schema();
let exec = t.scan(&None, 1024 * 1024)?;
- let partitions = exec.partitions()?;
- let mut data: Vec<Vec<RecordBatch>> = Vec::with_capacity(partitions.len());
- for partition in &partitions {
- let it = partition.execute()?;
+ let mut data: Vec<Vec<RecordBatch>> =
+ Vec::with_capacity(exec.output_partitioning().partition_count());
+ for partition in 0..exec.output_partitioning().partition_count() {
+ let it = exec.execute(partition)?;
let mut it = it.lock().unwrap();
let mut partition_batches = vec![];
while let Ok(Some(batch)) = it.next_batch() {
@@ -147,8 +147,7 @@ mod tests {
// scan with projection
let exec = provider.scan(&Some(vec![2, 1]), 1024)?;
- let partitions = exec.partitions()?;
- let it = partitions[0].execute()?;
+ let it = exec.execute(0)?;
let batch2 = it.lock().expect("mutex lock").next_batch()?.unwrap();
assert_eq!(2, batch2.schema().fields().len());
assert_eq!("c", batch2.schema().field(0).name());
@@ -178,8 +177,7 @@ mod tests {
let provider = MemTable::new(schema, vec![vec![batch]])?;
let exec = provider.scan(&None, 1024)?;
- let partitions = exec.partitions()?;
- let it = partitions[0].execute()?;
+ let it = exec.execute(0)?;
let batch1 = it.lock().expect("mutex lock").next_batch()?.unwrap();
assert_eq!(3, batch1.schema().fields().len());
assert_eq!(3, batch1.num_columns());
diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs
index 776a515..e2cd847 100644
--- a/rust/datafusion/src/datasource/parquet.rs
+++ b/rust/datafusion/src/datasource/parquet.rs
@@ -81,8 +81,7 @@ mod tests {
let table = load_table("alltypes_plain.parquet")?;
let projection = None;
let exec = table.scan(&projection, 2)?;
- let partitions = exec.partitions()?;
- let it = partitions[0].execute()?;
+ let it = exec.execute(0)?;
let mut it = it.lock().unwrap();
let mut count = 0;
@@ -302,8 +301,7 @@ mod tests {
projection: &Option<Vec<usize>>,
) -> Result<RecordBatch> {
let exec = table.scan(projection, 1024)?;
- let partitions = exec.partitions()?;
- let it = partitions[0].execute()?;
+ let it = exec.execute(0)?;
let mut it = it.lock().expect("failed to lock mutex");
Ok(it
.next_batch()?
diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs
index ca87c2e..c6e8c05 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -350,72 +350,64 @@ impl ExecutionContext {
}
/// Execute a physical plan and collect the results in memory
- pub fn collect(&self, plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
- let partitions = plan.partitions()?;
-
- match partitions.len() {
+ pub fn collect(&self, plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
+ match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
- let it = partitions[0].execute()?;
+ let it = plan.execute(0)?;
common::collect(it)
}
_ => {
// merge into a single partition
let plan = MergeExec::new(
plan.schema().clone(),
- partitions,
+ plan.clone(),
self.state
.lock()
.expect("failed to lock mutex")
.config
.concurrency,
);
- let partitions = plan.partitions()?;
- if partitions.len() == 1 {
- common::collect(partitions[0].execute()?)
- } else {
- Err(ExecutionError::InternalError(format!(
- "MergeExec returned {} partitions",
- partitions.len()
- )))
- }
+ // MergeExec must produce a single partition
+ assert_eq!(1, plan.output_partitioning().partition_count());
+ common::collect(plan.execute(0)?)
}
}
}
/// Execute a query and write the results to a partitioned CSV file
- pub fn write_csv(&self, plan: &dyn ExecutionPlan, path: &str) -> Result<()> {
+ pub fn write_csv(&self, plan: Arc<dyn ExecutionPlan>, path: &str) -> Result<()> {
// create directory to contain the CSV files (one per partition)
let path = path.to_string();
fs::create_dir(&path)?;
- let threads: Vec<JoinHandle<Result<()>>> = plan
- .partitions()?
- .iter()
- .enumerate()
- .map(|(i, p)| {
- let p = p.clone();
- let path = path.clone();
- thread::spawn(move || {
- let filename = format!("part-{}.csv", i);
- let path = Path::new(&path).join(&filename);
- let file = fs::File::create(path)?;
- let mut writer = csv::Writer::new(file);
- let reader = p.execute()?;
- let mut reader = reader.lock().unwrap();
- loop {
- match reader.next_batch() {
- Ok(Some(batch)) => {
- writer.write(&batch)?;
+ let threads: Vec<JoinHandle<Result<()>>> =
+ (0..plan.output_partitioning().partition_count())
+ .enumerate()
+ .map(|(i, p)| {
+ let p = p.clone();
+ let path = path.clone();
+ let plan = plan.clone();
+ thread::spawn(move || {
+ let filename = format!("part-{}.csv", i);
+ let path = Path::new(&path).join(&filename);
+ let file = fs::File::create(path)?;
+ let mut writer = csv::Writer::new(file);
+ let reader = plan.execute(p)?;
+ let mut reader = reader.lock().unwrap();
+ loop {
+ match reader.next_batch() {
+ Ok(Some(batch)) => {
+ writer.write(&batch)?;
+ }
+ Ok(None) => break,
+ Err(e) => return Err(ExecutionError::from(e)),
}
- Ok(None) => break,
- Err(e) => return Err(ExecutionError::from(e)),
}
- }
- Ok(())
+ Ok(())
+ })
})
- })
- .collect();
+ .collect();
// combine the results from each thread
for thread in threads {
@@ -547,7 +539,7 @@ mod tests {
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
- let results = ctx.collect(physical_plan.as_ref())?;
+ let results = ctx.collect(physical_plan)?;
// there should be one batch per partition
assert_eq!(results.len(), partition_count);
@@ -594,7 +586,7 @@ mod tests {
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
- let batches = ctx.collect(physical_plan.as_ref())?;
+ let batches = ctx.collect(physical_plan)?;
assert_eq!(4, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(10, batches[0].num_rows());
@@ -680,7 +672,7 @@ mod tests {
assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());
- let batches = ctx.collect(physical_plan.as_ref())?;
+ let batches = ctx.collect(physical_plan)?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());
@@ -1048,7 +1040,7 @@ mod tests {
let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan)?;
- let result = ctx.collect(plan.as_ref())?;
+ let result = ctx.collect(plan)?;
let batch = &result[0];
assert_eq!(3, batch.num_columns());
@@ -1110,7 +1102,7 @@ mod tests {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
- ctx.collect(physical_plan.as_ref())
+ ctx.collect(physical_plan)
}
fn field_names(result: &RecordBatch) -> Vec<String> {
@@ -1134,7 +1126,7 @@ mod tests {
let logical_plan = ctx.create_logical_plan(sql)?;
let logical_plan = ctx.optimize(&logical_plan)?;
let physical_plan = ctx.create_physical_plan(&logical_plan)?;
- ctx.write_csv(physical_plan.as_ref(), out_dir)
+ ctx.write_csv(physical_plan, out_dir)
}
/// Generate CSV partitions within the supplied directory
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs
index d53fb38..edaf7ae 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -110,7 +110,7 @@ impl DataFrame for DataFrameImpl {
let ctx = ExecutionContext::from(self.ctx_state.clone());
let plan = ctx.optimize(&self.plan)?;
let plan = ctx.create_physical_plan(&plan)?;
- Ok(ctx.collect(plan.as_ref())?)
+ Ok(ctx.collect(plan)?)
}
/// Returns the schema from the logical plan
diff --git a/rust/datafusion/src/execution/physical_plan/csv.rs b/rust/datafusion/src/execution/physical_plan/csv.rs
index c53b21a..ca35254 100644
--- a/rust/datafusion/src/execution/physical_plan/csv.rs
+++ b/rust/datafusion/src/execution/physical_plan/csv.rs
@@ -21,8 +21,8 @@ use std::fs::File;
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, Partitioning};
use arrow::csv;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
@@ -107,6 +107,8 @@ impl<'a> CsvReadOptions<'a> {
pub struct CsvExec {
/// Path to directory containing partitioned CSV files with the same schema
path: String,
+ /// The individual files under path
+ filenames: Vec<String>,
/// Schema representing the CSV file
schema: SchemaRef,
/// Does the CSV file have a header?
@@ -131,9 +133,17 @@ impl CsvExec {
projection: Option<Vec<usize>>,
batch_size: usize,
) -> Result<Self> {
+ let file_extension = String::from(options.file_extension);
+
+ let mut filenames: Vec<String> = vec![];
+ common::build_file_list(path, &mut filenames, file_extension.as_str())?;
+ if filenames.is_empty() {
+ return Err(ExecutionError::General("No files found".to_string()));
+ }
+
let schema = match options.schema {
Some(s) => s.clone(),
- None => CsvExec::try_infer_schema(path, &options)?,
+ None => CsvExec::try_infer_schema(&filenames, &options)?,
};
let projected_schema = match &projection {
@@ -143,10 +153,11 @@ impl CsvExec {
Ok(Self {
path: path.to_string(),
+ filenames,
schema: Arc::new(schema),
has_header: options.has_header,
delimiter: Some(options.delimiter),
- file_extension: String::from(options.file_extension),
+ file_extension,
projection,
projected_schema: Arc::new(projected_schema),
batch_size,
@@ -154,16 +165,12 @@ impl CsvExec {
}
/// Infer schema for given CSV dataset
- pub fn try_infer_schema(path: &str, options: &CsvReadOptions) -> Result<Schema> {
- let mut filenames: Vec<String> = vec![];
- common::build_file_list(path, &mut filenames, options.file_extension)?;
-
- if filenames.is_empty() {
- return Err(ExecutionError::General("No files found".to_string()));
- }
-
+ pub fn try_infer_schema(
+ filenames: &[String],
+ options: &CsvReadOptions,
+ ) -> Result<Schema> {
Ok(csv::infer_schema_from_files(
- &filenames,
+ filenames,
options.delimiter,
Some(options.schema_infer_max_records),
options.has_header,
@@ -177,73 +184,17 @@ impl ExecutionPlan for CsvExec {
self.projected_schema.clone()
}
- /// Get the partitions for this execution plan. Each partition can be executed in parallel.
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- let mut filenames: Vec<String> = vec![];
- common::build_file_list(
- &self.path,
- &mut filenames,
- self.file_extension.as_str(),
- )?;
- let partitions = filenames
- .iter()
- .map(|filename| {
- Arc::new(CsvPartition::new(
- &filename,
- self.schema.clone(),
- self.has_header,
- self.delimiter,
- self.projection.clone(),
- self.batch_size,
- )) as Arc<dyn Partition>
- })
- .collect();
- Ok(partitions)
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.filenames.len())
}
-}
-
-/// CSV Partition
-#[derive(Debug)]
-struct CsvPartition {
- /// Path to the CSV File
- path: String,
- /// Schema representing the CSV file
- schema: SchemaRef,
- /// Does the CSV file have a header?
- has_header: bool,
- /// An optional column delimiter. Defaults to `b','`
- delimiter: Option<u8>,
- /// Optional projection for which columns to load
- projection: Option<Vec<usize>>,
- /// Batch size
- batch_size: usize,
-}
-
-impl CsvPartition {
- fn new(
- path: &str,
- schema: SchemaRef,
- has_header: bool,
- delimiter: Option<u8>,
- projection: Option<Vec<usize>>,
- batch_size: usize,
- ) -> Self {
- Self {
- path: path.to_string(),
- schema,
- has_header,
- delimiter,
- projection,
- batch_size,
- }
- }
-}
-impl Partition for CsvPartition {
- /// Execute this partition and return an iterator over RecordBatch
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(Arc::new(Mutex::new(CsvIterator::try_new(
- &self.path,
+ &self.filenames[partition],
self.schema.clone(),
self.has_header,
self.delimiter,
@@ -315,9 +266,8 @@ mod tests {
assert_eq!(13, csv.schema.fields().len());
assert_eq!(3, csv.projected_schema.fields().len());
assert_eq!(3, csv.schema().fields().len());
- let partitions = csv.partitions()?;
- let results = partitions[0].execute()?;
- let mut it = results.lock().unwrap();
+ let it = csv.execute(0)?;
+ let mut it = it.lock().unwrap();
let batch = it.next_batch()?.unwrap();
assert_eq!(3, batch.num_columns());
let batch_schema = batch.schema();
@@ -339,9 +289,8 @@ mod tests {
assert_eq!(13, csv.schema.fields().len());
assert_eq!(13, csv.projected_schema.fields().len());
assert_eq!(13, csv.schema().fields().len());
- let partitions = csv.partitions()?;
- let results = partitions[0].execute()?;
- let mut it = results.lock().unwrap();
+ let it = csv.execute(0)?;
+ let mut it = it.lock().unwrap();
let batch = it.next_batch()?.unwrap();
assert_eq!(13, batch.num_columns());
let batch_schema = batch.schema();
diff --git a/rust/datafusion/src/execution/physical_plan/datasource.rs b/rust/datafusion/src/execution/physical_plan/datasource.rs
deleted file mode 100644
index 5a6fe31..0000000
--- a/rust/datafusion/src/execution/physical_plan/datasource.rs
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! ExecutionPlan implementation for DataFusion data sources
-
-use std::{
- fmt::{self, Debug, Formatter},
- sync::Arc,
-};
-
-use crate::error::Result;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
-use arrow::datatypes::SchemaRef;
-
-/// Datasource execution plan
-pub struct DatasourceExec {
- schema: SchemaRef,
- partitions: Vec<Arc<dyn Partition>>,
-}
-
-impl Debug for DatasourceExec {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- f.debug_struct("DataSourceExec")
- .field("schema", &self.schema)
- .field("partitions.len", &self.partitions.len())
- .finish()
- }
-}
-
-impl DatasourceExec {
- /// Create a new data source execution plan
- pub fn new(schema: SchemaRef, partitions: Vec<Arc<dyn Partition>>) -> Self {
- Self { schema, partitions }
- }
-}
-
-impl ExecutionPlan for DatasourceExec {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(self.partitions.clone())
- }
-}
diff --git a/rust/datafusion/src/execution/physical_plan/explain.rs b/rust/datafusion/src/execution/physical_plan/explain.rs
index c9213d5..b14fe5c 100644
--- a/rust/datafusion/src/execution/physical_plan/explain.rs
+++ b/rust/datafusion/src/execution/physical_plan/explain.rs
@@ -19,7 +19,7 @@
use crate::error::Result;
use crate::{
- execution::physical_plan::{common::RecordBatchIterator, ExecutionPlan, Partition},
+ execution::physical_plan::{common::RecordBatchIterator, ExecutionPlan},
logicalplan::StringifiedPlan,
};
use arrow::{
@@ -28,6 +28,7 @@ use arrow::{
record_batch::{RecordBatch, RecordBatchReader},
};
+use crate::execution::physical_plan::Partitioning;
use std::sync::{Arc, Mutex};
/// Explain execution plan operator. This operator contains the string
@@ -57,24 +58,17 @@ impl ExecutionPlan for ExplainExec {
self.schema.clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(vec![Arc::new(ExplainPartition {
- schema: self.schema.clone(),
- stringified_plans: self.stringified_plans.clone(),
- })])
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
}
-}
-#[derive(Debug)]
-struct ExplainPartition {
- /// Input schema
- schema: SchemaRef,
- /// The various plans that were created.
- stringified_plans: Vec<StringifiedPlan>,
-}
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ assert_eq!(0, partition);
-impl Partition for ExplainPartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
let mut type_builder = StringArray::builder(self.stringified_plans.len());
let mut plan_builder = StringArray::builder(self.stringified_plans.len());
diff --git a/rust/datafusion/src/execution/physical_plan/filter.rs b/rust/datafusion/src/execution/physical_plan/filter.rs
index fee7fe4..c5f29b5 100644
--- a/rust/datafusion/src/execution/physical_plan/filter.rs
+++ b/rust/datafusion/src/execution/physical_plan/filter.rs
@@ -21,7 +21,7 @@
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::{ExecutionPlan, Partition, PhysicalExpr};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
use arrow::array::BooleanArray;
use arrow::compute::filter;
use arrow::datatypes::{DataType, SchemaRef};
@@ -64,46 +64,19 @@ impl ExecutionPlan for FilterExec {
self.input.schema()
}
- /// Get the partitions for this execution plan
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- let partitions: Vec<Arc<dyn Partition>> = self
- .input
- .partitions()?
- .iter()
- .map(|p| {
- let expr = self.predicate.clone();
- let partition: Arc<dyn Partition> = Arc::new(FilterExecPartition {
- schema: self.input.schema(),
- predicate: expr,
- input: p.clone() as Arc<dyn Partition>,
- });
-
- partition
- })
- .collect();
-
- Ok(partitions)
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ self.input.output_partitioning()
}
-}
-
-/// Represents a single partition of a filter execution plan
-#[derive(Debug)]
-struct FilterExecPartition {
- /// Output schema, which is the same as the input schema for this operator
- schema: SchemaRef,
- /// The expression to filter on. This expression must evaluate to a boolean value.
- predicate: Arc<dyn PhysicalExpr>,
- /// The input partition to filter.
- input: Arc<dyn Partition>,
-}
-impl Partition for FilterExecPartition {
- /// Execute the filter
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(Arc::new(Mutex::new(FilterExecIter {
- schema: self.schema.clone(),
+ schema: self.input.schema().clone(),
predicate: self.predicate.clone(),
- input: self.input.execute()?,
+ input: self.input.execute(partition)?,
})))
}
}
@@ -202,7 +175,7 @@ mod tests {
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate, Arc::new(csv))?);
- let results = test::execute(filter.as_ref())?;
+ let results = test::execute(filter)?;
results
.iter()
diff --git a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
index 19038d6..0678121 100644
--- a/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
+++ b/rust/datafusion/src/execution/physical_plan/hash_aggregate.rs
@@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::{
- Accumulator, AggregateExpr, ExecutionPlan, Partition, PhysicalExpr,
+ Accumulator, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
};
use arrow::array::{
@@ -111,65 +111,28 @@ impl ExecutionPlan for HashAggregateExec {
self.schema.clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(self
- .input
- .partitions()?
- .iter()
- .map(|p| {
- let aggregate: Arc<dyn Partition> =
- Arc::new(HashAggregatePartition::new(
- self.group_expr.clone(),
- self.aggr_expr.clone(),
- p.clone() as Arc<dyn Partition>,
- self.schema.clone(),
- ));
-
- aggregate
- })
- .collect::<Vec<Arc<dyn Partition>>>())
- }
-}
-
-#[derive(Debug)]
-struct HashAggregatePartition {
- group_expr: Vec<Arc<dyn PhysicalExpr>>,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- input: Arc<dyn Partition>,
- schema: SchemaRef,
-}
-
-impl HashAggregatePartition {
- /// Create a new HashAggregatePartition
- pub fn new(
- group_expr: Vec<Arc<dyn PhysicalExpr>>,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- input: Arc<dyn Partition>,
- schema: SchemaRef,
- ) -> Self {
- HashAggregatePartition {
- group_expr,
- aggr_expr,
- input,
- schema,
- }
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ self.input.output_partitioning()
}
-}
-impl Partition for HashAggregatePartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ let input = self.input.execute(partition)?;
if self.group_expr.is_empty() {
Ok(Arc::new(Mutex::new(HashAggregateIterator::new(
self.schema.clone(),
self.aggr_expr.clone(),
- self.input.execute()?,
+ input,
))))
} else {
Ok(Arc::new(Mutex::new(GroupedHashAggregateIterator::new(
self.schema.clone(),
self.group_expr.clone(),
self.aggr_expr.clone(),
- self.input.execute()?,
+ input,
))))
}
}
@@ -726,14 +689,13 @@ mod tests {
let aggregates: Vec<(Arc<dyn AggregateExpr>, String)> =
vec![(sum(col("c4")), "SUM(c4)".to_string())];
- let partition_aggregate = HashAggregateExec::try_new(
+ let partition_aggregate = Arc::new(HashAggregateExec::try_new(
groups.clone(),
aggregates.clone(),
Arc::new(csv),
- )?;
+ )?);
let schema = partition_aggregate.schema();
- let partitions = partition_aggregate.partitions()?;
// construct the expressions for the final aggregation
let (final_group, final_aggr) = partition_aggregate.make_final_expr(
@@ -741,9 +703,9 @@ mod tests {
aggregates.iter().map(|x| x.1.clone()).collect(),
);
- let merge = Arc::new(MergeExec::new(schema.clone(), partitions, 2));
+ let merge = Arc::new(MergeExec::new(schema.clone(), partition_aggregate, 2));
- let merged_aggregate = HashAggregateExec::try_new(
+ let merged_aggregate = Arc::new(HashAggregateExec::try_new(
final_group
.iter()
.enumerate()
@@ -755,9 +717,9 @@ mod tests {
.map(|(i, expr)| (expr.clone(), aggregates[i].1.clone()))
.collect(),
merge,
- )?;
+ )?);
- let result = test::execute(&merged_aggregate)?;
+ let result = test::execute(merged_aggregate)?;
assert_eq!(result.len(), 1);
let batch = &result[0];
diff --git a/rust/datafusion/src/execution/physical_plan/limit.rs b/rust/datafusion/src/execution/physical_plan/limit.rs
index 6d1cffd..862ac24 100644
--- a/rust/datafusion/src/execution/physical_plan/limit.rs
+++ b/rust/datafusion/src/execution/physical_plan/limit.rs
@@ -23,8 +23,7 @@ use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common::{self, RecordBatchIterator};
use crate::execution::physical_plan::memory::MemoryIterator;
use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::ExecutionPlan;
-use crate::execution::physical_plan::Partition;
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning};
use arrow::array::ArrayRef;
use arrow::compute::limit;
use arrow::datatypes::SchemaRef;
@@ -36,7 +35,7 @@ pub struct GlobalLimitExec {
/// Input schema
schema: SchemaRef,
/// Input partitions
- partitions: Vec<Arc<dyn Partition>>,
+ input: Arc<dyn ExecutionPlan>,
/// Maximum number of rows to return
limit: usize,
/// Number of threads to run parallel LocalLimitExec on
@@ -47,13 +46,13 @@ impl GlobalLimitExec {
/// Create a new MergeExec
pub fn new(
schema: SchemaRef,
- partitions: Vec<Arc<dyn Partition>>,
+ input: Arc<dyn ExecutionPlan>,
limit: usize,
concurrency: usize,
) -> Self {
GlobalLimitExec {
schema,
- partitions,
+ input,
limit,
concurrency,
}
@@ -65,49 +64,30 @@ impl ExecutionPlan for GlobalLimitExec {
self.schema.clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(vec![Arc::new(LimitPartition {
- schema: self.schema.clone(),
- partitions: self.partitions.clone(),
- limit: self.limit,
- concurrency: self.concurrency,
- })])
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
}
-}
-#[derive(Debug)]
-struct LimitPartition {
- /// Input schema
- schema: SchemaRef,
- /// Input partitions
- partitions: Vec<Arc<dyn Partition>>,
- /// Maximum number of rows to return
- limit: usize,
- /// Number of threads to run parallel LocalLimitExec on
- concurrency: usize,
-}
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ // GlobalLimitExec has a single partition
+ assert_eq!(0, partition);
-impl Partition for LimitPartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
// apply limit in parallel across all input partitions
- let local_limit = self
- .partitions
- .iter()
- .map(|p| {
- Arc::new(LocalLimitExec::new(
- p.clone(),
- self.schema.clone(),
- self.limit,
- )) as Arc<dyn Partition>
- })
- .collect();
+ let local_limit = Arc::new(LocalLimitExec::new(
+ self.input.clone(),
+ self.schema.clone(),
+ self.limit,
+ ));
// limit needs to collapse inputs down to a single partition
let merge = MergeExec::new(self.schema.clone(), local_limit, self.concurrency);
- let merge_partitions = merge.partitions()?;
// MergeExec must always produce a single partition
- assert_eq!(1, merge_partitions.len());
- let it = merge_partitions[0].execute()?;
+ assert_eq!(1, merge.output_partitioning().partition_count());
+ let it = merge.execute(0)?;
let batches = common::collect(it)?;
// apply the limit to the output
@@ -138,14 +118,14 @@ impl Partition for LimitPartition {
/// LocalLimitExec applies a limit so a single partition
#[derive(Debug)]
pub struct LocalLimitExec {
- input: Arc<dyn Partition>,
+ input: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
limit: usize,
}
impl LocalLimitExec {
/// Create a new LocalLimitExec partition
- pub fn new(input: Arc<dyn Partition>, schema: SchemaRef, limit: usize) -> Self {
+ pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef, limit: usize) -> Self {
Self {
input,
schema,
@@ -154,9 +134,20 @@ impl LocalLimitExec {
}
}
-impl Partition for LocalLimitExec {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- let it = self.input.execute()?;
+impl ExecutionPlan for LocalLimitExec {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ self.input.output_partitioning()
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ let it = self.input.execute(partition)?;
Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
collect_with_limit(it, self.limit)?,
self.schema.clone(),
@@ -230,14 +221,12 @@ mod tests {
CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
// input should have 4 partitions
- let input = csv.partitions()?;
- assert_eq!(input.len(), num_partitions);
+ assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
- let limit = GlobalLimitExec::new(schema.clone(), input, 7, 2);
- let partitions = limit.partitions()?;
+ let limit = GlobalLimitExec::new(schema.clone(), Arc::new(csv), 7, 2);
// the result should contain 4 batches (one per input partition)
- let iter = partitions[0].execute()?;
+ let iter = limit.execute(0)?;
let batches = common::collect(iter)?;
// there should be a total of 100 rows
diff --git a/rust/datafusion/src/execution/physical_plan/memory.rs b/rust/datafusion/src/execution/physical_plan/memory.rs
index f9b4441..096cecb 100644
--- a/rust/datafusion/src/execution/physical_plan/memory.rs
+++ b/rust/datafusion/src/execution/physical_plan/memory.rs
@@ -20,7 +20,7 @@
use std::sync::{Arc, Mutex};
use crate::error::Result;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning};
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -42,20 +42,20 @@ impl ExecutionPlan for MemoryExec {
self.schema.clone()
}
- /// Get the partitions for this execution plan. Each partition can be executed in parallel.
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- let partitions = self
- .partitions
- .iter()
- .map(|vec| {
- Arc::new(MemoryPartition::new(
- vec.clone(),
- self.schema.clone(),
- self.projection.clone(),
- )) as Arc<dyn Partition>
- })
- .collect();
- Ok(partitions)
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.partitions.len())
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
+ self.partitions[partition].clone(),
+ self.schema.clone(),
+ self.projection.clone(),
+ )?)))
}
}
@@ -74,43 +74,6 @@ impl MemoryExec {
}
}
-/// Memory partition
-#[derive(Debug)]
-struct MemoryPartition {
- /// Vector of record batches
- data: Vec<RecordBatch>,
- /// Schema representing the data
- schema: SchemaRef,
- /// Optional projection
- projection: Option<Vec<usize>>,
-}
-
-impl MemoryPartition {
- /// Create a new in-memory partition
- fn new(
- data: Vec<RecordBatch>,
- schema: SchemaRef,
- projection: Option<Vec<usize>>,
- ) -> Self {
- Self {
- data,
- schema,
- projection,
- }
- }
-}
-
-impl Partition for MemoryPartition {
- /// Execute this partition and return an iterator over RecordBatch
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- Ok(Arc::new(Mutex::new(MemoryIterator::try_new(
- self.data.clone(),
- self.schema.clone(),
- self.projection.clone(),
- )?)))
- }
-}
-
/// Iterator over batches
pub(crate) struct MemoryIterator {
/// Vector of record batches
diff --git a/rust/datafusion/src/execution/physical_plan/merge.rs b/rust/datafusion/src/execution/physical_plan/merge.rs
index 97614bd..9d36d6a 100644
--- a/rust/datafusion/src/execution/physical_plan/merge.rs
+++ b/rust/datafusion/src/execution/physical_plan/merge.rs
@@ -23,7 +23,7 @@ use std::thread::{self, JoinHandle};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common::RecordBatchIterator;
-use crate::execution::physical_plan::Partition;
+use crate::execution::physical_plan::Partitioning;
use crate::execution::physical_plan::{common, ExecutionPlan};
use arrow::datatypes::SchemaRef;
@@ -36,7 +36,7 @@ pub struct MergeExec {
/// Input schema
schema: SchemaRef,
/// Input partitions
- partitions: Vec<Arc<dyn Partition>>,
+ input: Arc<dyn ExecutionPlan>,
/// Maximum number of concurrent threads
concurrency: usize,
}
@@ -45,12 +45,12 @@ impl MergeExec {
/// Create a new MergeExec
pub fn new(
schema: SchemaRef,
- partitions: Vec<Arc<dyn Partition>>,
+ input: Arc<dyn ExecutionPlan>,
max_concurrency: usize,
) -> Self {
MergeExec {
schema,
- partitions,
+ input,
concurrency: max_concurrency,
}
}
@@ -61,64 +61,39 @@ impl ExecutionPlan for MergeExec {
self.schema.clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(vec![Arc::new(MergePartition {
- schema: self.schema.clone(),
- partitions: self.partitions.clone(),
- concurrency: self.concurrency,
- })])
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
}
-}
-#[derive(Debug)]
-struct MergePartition {
- /// Input schema
- schema: SchemaRef,
- /// Input partitions
- partitions: Vec<Arc<dyn Partition>>,
- /// Maximum number of concurrent threads
- concurrency: usize,
-}
-
-fn collect_from_thread(
- thread: JoinHandle<Result<Vec<RecordBatch>>>,
- combined_results: &mut Vec<Arc<RecordBatch>>,
-) -> Result<()> {
- match thread.join() {
- Ok(join) => {
- join?
- .iter()
- .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
- Ok(())
- }
- Err(e) => Err(ExecutionError::General(format!(
- "Error collecting batches from thread: {:?}",
- e
- ))),
- }
-}
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ // MergeExec produces a single partition
+ assert_eq!(0, partition);
-impl Partition for MergePartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- match self.partitions.len() {
+ let input_partitions = self.input.output_partitioning().partition_count();
+ match input_partitions {
0 => Err(ExecutionError::General(
"MergeExec requires at least one input partition".to_owned(),
)),
1 => {
// bypass any threading if there is a single partition
- self.partitions[0].execute()
+ self.input.execute(0)
}
_ => {
- let partitions_per_thread =
- (self.partitions.len() / self.concurrency).max(1);
- let chunks = self.partitions.chunks(partitions_per_thread);
+ let partitions_per_thread = (input_partitions / self.concurrency).max(1);
+ let range: Vec<usize> = (0..input_partitions).collect();
+ let chunks = range.chunks(partitions_per_thread);
let threads: Vec<JoinHandle<Result<Vec<RecordBatch>>>> = chunks
.map(|chunk| {
let chunk = chunk.to_vec();
+ let input = self.input.clone();
thread::spawn(move || {
let mut batches = vec![];
for partition in chunk {
- let it = partition.execute()?;
+ let it = input.execute(partition)?;
common::collect(it).iter().for_each(|b| {
b.iter().for_each(|b| batches.push(b.clone()))
});
@@ -143,6 +118,24 @@ impl Partition for MergePartition {
}
}
+fn collect_from_thread(
+ thread: JoinHandle<Result<Vec<RecordBatch>>>,
+ combined_results: &mut Vec<Arc<RecordBatch>>,
+) -> Result<()> {
+ match thread.join() {
+ Ok(join) => {
+ join?
+ .iter()
+ .for_each(|batch| combined_results.push(Arc::new(batch.clone())));
+ Ok(())
+ }
+ Err(e) => Err(ExecutionError::General(format!(
+ "Error collecting batches from thread: {:?}",
+ e
+ ))),
+ }
+}
+
#[cfg(test)]
mod tests {
@@ -163,17 +156,15 @@ mod tests {
CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
// input should have 4 partitions
- let input = csv.partitions()?;
- assert_eq!(input.len(), num_partitions);
+ assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
- let merge = MergeExec::new(schema.clone(), input, 2);
+ let merge = MergeExec::new(schema.clone(), Arc::new(csv), 2);
// output of MergeExec should have a single partition
- let merged = merge.partitions()?;
- assert_eq!(merged.len(), 1);
+ assert_eq!(merge.output_partitioning().partition_count(), 1);
// the result should contain 4 batches (one per input partition)
- let iter = merged[0].execute()?;
+ let iter = merge.execute(0)?;
let batches = common::collect(iter)?;
assert_eq!(batches.len(), num_partitions);
diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs
index 817b6ab..80686f7 100644
--- a/rust/datafusion/src/execution/physical_plan/mod.rs
+++ b/rust/datafusion/src/execution/physical_plan/mod.rs
@@ -45,17 +45,33 @@ pub trait PhysicalPlanner {
}
/// Partition-aware execution plan for a relation
-pub trait ExecutionPlan: Debug {
+pub trait ExecutionPlan: Debug + Send + Sync {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
- /// Get the partitions for this execution plan. Each partition can be executed in parallel.
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>>;
+ /// Specifies the output partitioning scheme of this plan
+ fn output_partitioning(&self) -> Partitioning;
+ /// Execute one partition and return an iterator over RecordBatch
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>;
+}
+
+/// Partitioning schemes supported by operators.
+#[derive(Debug, Clone)]
+pub enum Partitioning {
+ /// Unknown partitioning scheme
+ UnknownPartitioning(usize),
}
-/// Represents a partition of an execution plan that can be executed on a thread
-pub trait Partition: Send + Sync + Debug {
- /// Execute this partition and return an iterator over RecordBatch
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>>;
+impl Partitioning {
+ /// Returns the number of partitions in this partitioning scheme
+ pub fn partition_count(&self) -> usize {
+ use Partitioning::*;
+ match self {
+ UnknownPartitioning(n) => *n,
+ }
+ }
}
/// Expression that can be evaluated against a RecordBatch
@@ -109,7 +125,6 @@ pub fn scalar_functions() -> Vec<ScalarFunction> {
pub mod common;
pub mod csv;
-pub mod datasource;
pub mod explain;
pub mod expressions;
pub mod filter;
diff --git a/rust/datafusion/src/execution/physical_plan/parquet.rs b/rust/datafusion/src/execution/physical_plan/parquet.rs
index 080cfff..2888560 100644
--- a/rust/datafusion/src/execution/physical_plan/parquet.rs
+++ b/rust/datafusion/src/execution/physical_plan/parquet.rs
@@ -23,15 +23,15 @@ use std::sync::{Arc, Mutex};
use std::{fmt, thread};
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::common;
-use crate::execution::physical_plan::{ExecutionPlan, Partition};
+use crate::execution::physical_plan::ExecutionPlan;
+use crate::execution::physical_plan::{common, Partitioning};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use parquet::file::reader::SerializedFileReader;
use crossbeam::channel::{bounded, Receiver, RecvError, Sender};
-use fmt::{Debug, Formatter};
+use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
/// Execution plan for scanning a Parquet file
@@ -91,53 +91,38 @@ impl ExecutionPlan for ParquetExec {
self.schema.clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- let partitions = self
- .filenames
- .iter()
- .map(|filename| {
- Arc::new(ParquetPartition::new(
- &filename,
- self.projection.clone(),
- self.schema.clone(),
- self.batch_size,
- )) as Arc<dyn Partition>
- })
- .collect();
- Ok(partitions)
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(self.filenames.len())
}
-}
-struct ParquetPartition {
- /// Filename for this partition
- filename: String,
- /// Projection for which columns to load
- projection: Vec<usize>,
- /// Batch size
- batch_size: usize,
- schema: SchemaRef,
-}
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ // because the parquet implementation is not thread-safe, it is necessary to execute
+ // on a thread and communicate with channels
+ let (response_tx, response_rx): (
+ Sender<ArrowResult<Option<RecordBatch>>>,
+ Receiver<ArrowResult<Option<RecordBatch>>>,
+ ) = bounded(2);
-impl Debug for ParquetPartition {
- fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
- f.debug_struct("ParquetPartition").finish()
- }
-}
+ let filename = self.filenames[partition].clone();
+ let projection = self.projection.clone();
+ let batch_size = self.batch_size;
-impl ParquetPartition {
- /// Create a new Parquet partition
- pub fn new(
- filename: &str,
- projection: Vec<usize>,
- schema: SchemaRef,
- batch_size: usize,
- ) -> Self {
- Self {
- filename: filename.to_owned(),
- projection,
- schema,
- batch_size,
- }
+ thread::spawn(move || {
+ if let Err(e) = read_file(&filename, projection, batch_size, response_tx) {
+ println!("Parquet reader thread terminated due to error: {:?}", e);
+ }
+ });
+
+ let iterator = Arc::new(Mutex::new(ParquetIterator {
+ schema: self.schema.clone(),
+ response_rx,
+ }));
+
+ Ok(iterator)
}
}
@@ -186,34 +171,6 @@ fn read_file(
Ok(())
}
-impl Partition for ParquetPartition {
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
- // because the parquet implementation is not thread-safe, it is necessary to execute
- // on a thread and communicate with channels
- let (response_tx, response_rx): (
- Sender<ArrowResult<Option<RecordBatch>>>,
- Receiver<ArrowResult<Option<RecordBatch>>>,
- ) = bounded(2);
-
- let filename = self.filename.clone();
- let projection = self.projection.clone();
- let batch_size = self.batch_size;
-
- thread::spawn(move || {
- if let Err(e) = read_file(&filename, projection, batch_size, response_tx) {
- println!("Parquet reader thread terminated due to error: {:?}", e);
- }
- });
-
- let iterator = Arc::new(Mutex::new(ParquetIterator {
- schema: self.schema.clone(),
- response_rx,
- }));
-
- Ok(iterator)
- }
-}
-
struct ParquetIterator {
schema: SchemaRef,
response_rx: Receiver<ArrowResult<Option<RecordBatch>>>,
@@ -244,10 +201,9 @@ mod tests {
env::var("PARQUET_TEST_DATA").expect("PARQUET_TEST_DATA not defined");
let filename = format!("{}/alltypes_plain.parquet", testdata);
let parquet_exec = ParquetExec::try_new(&filename, Some(vec![0, 1, 2]), 1024)?;
- let partitions = parquet_exec.partitions()?;
- assert_eq!(partitions.len(), 1);
+ assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
- let results = partitions[0].execute()?;
+ let results = parquet_exec.execute(0)?;
let mut results = results.lock().unwrap();
let batch = results.next_batch()?.unwrap();
diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs
index 3f2fe04..339e5fe 100644
--- a/rust/datafusion/src/execution/physical_plan/planner.rs
+++ b/rust/datafusion/src/execution/physical_plan/planner.rs
@@ -23,7 +23,6 @@ use super::expressions::binary;
use crate::error::{ExecutionError, Result};
use crate::execution::context::ExecutionContextState;
use crate::execution::physical_plan::csv::{CsvExec, CsvReadOptions};
-use crate::execution::physical_plan::datasource::DatasourceExec;
use crate::execution::physical_plan::explain::ExplainExec;
use crate::execution::physical_plan::expressions;
use crate::execution::physical_plan::expressions::{
@@ -71,27 +70,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
projection,
..
} => match ctx_state.datasources.get(table_name) {
- Some(provider) => {
- let exec = provider.scan(projection, batch_size)?;
- let partitions = exec.partitions()?;
- if partitions.is_empty() {
- Err(ExecutionError::General(
- "Table provider returned no partitions".to_string(),
- ))
- } else {
- let schema = match projection {
- None => provider.schema().clone(),
- Some(p) => Arc::new(Schema::new(
- p.iter()
- .map(|i| provider.schema().field(*i).clone())
- .collect(),
- )),
- };
-
- let exec = DatasourceExec::new(schema, partitions.clone());
- Ok(Arc::new(exec))
- }
- }
+ Some(provider) => provider.scan(projection, batch_size),
_ => Err(ExecutionError::General(format!(
"No table named {}",
table_name
@@ -180,15 +159,15 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
)?;
let schema = initial_aggr.schema();
- let partitions = initial_aggr.partitions()?;
- if partitions.len() == 1 {
+ if initial_aggr.output_partitioning().partition_count() == 1 {
return Ok(Arc::new(initial_aggr));
}
+ let initial_aggr = Arc::new(initial_aggr);
let merge = Arc::new(MergeExec::new(
schema.clone(),
- partitions,
+ initial_aggr.clone(),
ctx_state.config.concurrency,
));
@@ -261,7 +240,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner {
Ok(Arc::new(GlobalLimitExec::new(
input_schema.clone(),
- input.partitions()?,
+ input,
*n,
ctx_state.config.concurrency,
)))
diff --git a/rust/datafusion/src/execution/physical_plan/projection.rs b/rust/datafusion/src/execution/physical_plan/projection.rs
index a5ad0ef..ce51e89 100644
--- a/rust/datafusion/src/execution/physical_plan/projection.rs
+++ b/rust/datafusion/src/execution/physical_plan/projection.rs
@@ -23,7 +23,7 @@
use std::sync::{Arc, Mutex};
use crate::error::{ExecutionError, Result};
-use crate::execution::physical_plan::{ExecutionPlan, Partition, PhysicalExpr};
+use crate::execution::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr};
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
@@ -74,42 +74,19 @@ impl ExecutionPlan for ProjectionExec {
self.schema.clone()
}
- /// Get the partitions for this execution plan
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- let partitions: Vec<Arc<dyn Partition>> = self
- .input
- .partitions()?
- .iter()
- .map(|p| {
- let projection: Arc<dyn Partition> = Arc::new(ProjectionPartition {
- schema: self.schema.clone(),
- expr: self.expr.clone(),
- input: p.clone() as Arc<dyn Partition>,
- });
-
- projection
- })
- .collect();
-
- Ok(partitions)
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ self.input.output_partitioning()
}
-}
-
-/// Represents a single partition of a projection execution plan
-#[derive(Debug)]
-struct ProjectionPartition {
- schema: SchemaRef,
- expr: Vec<Arc<dyn PhysicalExpr>>,
- input: Arc<dyn Partition>,
-}
-impl Partition for ProjectionPartition {
- /// Execute the projection
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
Ok(Arc::new(Mutex::new(ProjectionIterator {
schema: self.schema.clone(),
expr: self.expr.clone(),
- input: self.input.execute()?,
+ input: self.input.execute(partition)?,
})))
}
}
@@ -168,9 +145,9 @@ mod tests {
let mut partition_count = 0;
let mut row_count = 0;
- for partition in projection.partitions()? {
+ for partition in 0..projection.output_partitioning().partition_count() {
partition_count += 1;
- let iterator = partition.execute()?;
+ let iterator = projection.execute(partition)?;
let mut iterator = iterator.lock().unwrap();
while let Some(batch) = iterator.next_batch()? {
assert_eq!(1, batch.num_columns());
diff --git a/rust/datafusion/src/execution/physical_plan/sort.rs b/rust/datafusion/src/execution/physical_plan/sort.rs
index e86d6cc..c6d2031 100644
--- a/rust/datafusion/src/execution/physical_plan/sort.rs
+++ b/rust/datafusion/src/execution/physical_plan/sort.rs
@@ -29,7 +29,7 @@ use crate::error::Result;
use crate::execution::physical_plan::common::RecordBatchIterator;
use crate::execution::physical_plan::expressions::PhysicalSortExpr;
use crate::execution::physical_plan::merge::MergeExec;
-use crate::execution::physical_plan::{common, ExecutionPlan, Partition};
+use crate::execution::physical_plan::{common, ExecutionPlan, Partitioning};
/// Sort execution plan
#[derive(Debug)]
@@ -61,44 +61,28 @@ impl ExecutionPlan for SortExec {
self.input.schema().clone()
}
- fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
- Ok(vec![
- (Arc::new(SortPartition {
- input: self.input.partitions()?,
- expr: self.expr.clone(),
- schema: self.schema(),
- concurrency: self.concurrency,
- })),
- ])
+ /// Get the output partitioning of this plan
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
}
-}
-/// Represents a single partition of a Sort execution plan
-#[derive(Debug)]
-struct SortPartition {
- schema: SchemaRef,
- expr: Vec<PhysicalSortExpr>,
- input: Vec<Arc<dyn Partition>>,
- /// Number of threads to execute input partitions on before combining into a single partition
- concurrency: usize,
-}
+ fn execute(
+ &self,
+ partition: usize,
+ ) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
+ assert_eq!(0, partition);
-impl Partition for SortPartition {
- /// Execute the sort
- fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + Sync>>> {
// sort needs to operate on a single partition currently
- let merge =
- MergeExec::new(self.schema.clone(), self.input.clone(), self.concurrency);
- let merge_partitions = merge.partitions()?;
+ let merge = MergeExec::new(self.schema(), self.input.clone(), self.concurrency);
// MergeExec must always produce a single partition
- assert_eq!(1, merge_partitions.len());
- let it = merge_partitions[0].execute()?;
+ assert_eq!(1, merge.output_partitioning().partition_count());
+ let it = merge.execute(0)?;
let batches = common::collect(it)?;
// combine all record batches into one for each column
let combined_batch = RecordBatch::try_new(
- self.schema.clone(),
- self.schema
+ self.schema(),
+ self.schema()
.fields()
.iter()
.enumerate()
@@ -124,7 +108,7 @@ impl Partition for SortPartition {
// reorder all rows based on sorted indices
let sorted_batch = RecordBatch::try_new(
- self.schema.clone(),
+ self.schema(),
combined_batch
.columns()
.iter()
@@ -143,7 +127,7 @@ impl Partition for SortPartition {
)?;
Ok(Arc::new(Mutex::new(RecordBatchIterator::new(
- self.schema.clone(),
+ self.schema(),
vec![Arc::new(sorted_batch)],
))))
}
@@ -166,7 +150,7 @@ mod tests {
let csv =
CsvExec::try_new(&path, CsvReadOptions::new().schema(&schema), None, 1024)?;
- let sort_exec = SortExec::try_new(
+ let sort_exec = Arc::new(SortExec::try_new(
vec![
// c1 string column
PhysicalSortExpr {
@@ -186,9 +170,9 @@ mod tests {
],
Arc::new(csv),
2,
- )?;
+ )?);
- let result: Vec<RecordBatch> = test::execute(&sort_exec)?;
+ let result: Vec<RecordBatch> = test::execute(sort_exec)?;
assert_eq!(result.len(), 1);
let columns = result[0].columns();
diff --git a/rust/datafusion/src/test/mod.rs b/rust/datafusion/src/test/mod.rs
index 26d67fe..10b034d 100644
--- a/rust/datafusion/src/test/mod.rs
+++ b/rust/datafusion/src/test/mod.rs
@@ -37,7 +37,7 @@ pub fn arrow_testdata_path() -> String {
}
/// Execute a physical plan and collect the results
-pub fn execute(plan: &dyn ExecutionPlan) -> Result<Vec<RecordBatch>> {
+pub fn execute(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
let ctx = ExecutionContext::new();
ctx.collect(plan)
}
diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs
index e9e09b4..43ac27a 100644
--- a/rust/datafusion/tests/sql.rs
+++ b/rust/datafusion/tests/sql.rs
@@ -110,7 +110,7 @@ fn parquet_single_nan_schema() {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
- let results = ctx.collect(plan.as_ref()).unwrap();
+ let results = ctx.collect(plan).unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
assert_eq!(1, batch.num_columns());
@@ -289,7 +289,7 @@ fn csv_query_avg_multi_batch() -> Result<()> {
let plan = ctx.create_logical_plan(&sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).unwrap();
- let results = ctx.collect(plan.as_ref()).unwrap();
+ let results = ctx.collect(plan).unwrap();
let batch = &results[0];
let column = batch.column(0);
let array = column.as_any().downcast_ref::<Float64Array>().unwrap();
@@ -550,7 +550,7 @@ fn execute(ctx: &mut ExecutionContext, sql: &str) -> Vec<String> {
let optimized_logical_schema = plan.schema().clone();
let plan = ctx.create_physical_plan(&plan).unwrap();
let physical_schema = plan.schema().clone();
- let results = ctx.collect(plan.as_ref()).unwrap();
+ let results = ctx.collect(plan).unwrap();
assert_eq!(logical_schema.as_ref(), optimized_logical_schema.as_ref());
assert_eq!(logical_schema.as_ref(), physical_schema.as_ref());