You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/04 08:53:36 UTC
[arrow-datafusion] branch master updated: Ballista: Implement
scalable distributed joins (#634)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 9314dbb Ballista: Implement scalable distributed joins (#634)
9314dbb is described below
commit 9314dbb31a785e8f08bb5e65ac55f51592920b01
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Jul 4 02:53:27 2021 -0600
Ballista: Implement scalable distributed joins (#634)
* Refactor Ballista planner to support RepartitionExec
* Improve tests and replace MergeExec with CoalescePartitionsExec in query plan output
---
.../core/src/execution_plans/shuffle_writer.rs | 20 ++-
.../core/src/execution_plans/unresolved_shuffle.rs | 15 +-
ballista/rust/core/src/utils.rs | 11 +-
ballista/rust/scheduler/src/planner.rs | 189 +++++++++++----------
ballista/rust/scheduler/src/test_utils.rs | 15 +-
datafusion/src/lib.rs | 2 +-
.../src/physical_plan/coalesce_partitions.rs | 14 +-
datafusion/src/physical_plan/hash_aggregate.rs | 2 +-
datafusion/src/physical_plan/limit.rs | 2 +-
datafusion/src/physical_plan/mod.rs | 2 +-
datafusion/src/physical_plan/planner.rs | 2 +-
datafusion/tests/sql.rs | 2 +-
12 files changed, 143 insertions(+), 133 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 2d8d783..7fffaba 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream};
+use datafusion::physical_plan::{
+ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+};
use futures::StreamExt;
use log::info;
use std::fs::File;
@@ -307,6 +309,22 @@ impl ExecutionPlan for ShuffleWriterExec {
)),
}
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(
+ f,
+ "ShuffleWriterExec: {:?}",
+ self.shuffle_output_partitioning
+ )
+ }
+ }
+ }
}
fn result_schema() -> SchemaRef {
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 9c53bc7..49b4f7a 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -23,12 +23,13 @@ use crate::serde::scheduler::PartitionLocation;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use log::info;
+use std::fmt::Formatter;
/// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet.
///
@@ -97,4 +98,16 @@ impl ExecutionPlan for UnresolvedShuffleExec {
"Ballista UnresolvedShuffleExec does not support execution".to_owned(),
))
}
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "UnresolvedShuffleExec")
+ }
+ }
+ }
}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d043763..8a510f4 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -227,16 +227,7 @@ fn build_exec_plan_diagram(
/// Create a DataFusion context that is compatible with Ballista
pub fn create_datafusion_context() -> ExecutionContext {
- // remove Repartition rule because that isn't supported yet
- let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
- Arc::new(CoalesceBatches::new()),
- Arc::new(AddCoalescePartitionsExec::new()),
- ];
- let config = ExecutionConfig::new()
- .with_concurrency(1)
- .with_repartition_joins(false)
- .with_repartition_aggregations(false)
- .with_physical_optimizer_rules(rules);
+ let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
ExecutionContext::with_config(config)
}
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 70d90a4..3195261 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -28,15 +28,11 @@ use ballista_core::{
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
serde::scheduler::PartitionLocation,
};
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::execution::context::ExecutionContext;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::windows::WindowAggExec;
-use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use log::info;
type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -71,13 +67,18 @@ impl DistributedPlanner {
info!("planning query stages");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
- stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?);
+ stages.push(create_shuffle_writer(
+ job_id,
+ self.next_stage_id(),
+ new_plan,
+ None,
+ )?);
Ok(stages)
}
/// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
/// This function is needed because the input execution_plan might need to be modified, but it might not hold a
- /// compelte query stage (its parent might also belong to the same stage)
+ /// complete query stage (its parent might also belong to the same stage)
fn plan_query_stages_internal(
&mut self,
job_id: &str,
@@ -98,22 +99,17 @@ impl DistributedPlanner {
}
if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
- // remove Repartition rule because that isn't supported yet
- let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
- Arc::new(CoalesceBatches::new()),
- Arc::new(AddCoalescePartitionsExec::new()),
- ];
- let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
- let ctx = ExecutionContext::with_config(config);
+ let ctx = ExecutionContext::new();
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
- } else if let Some(merge) = execution_plan
+ } else if let Some(coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
- let query_stage = create_query_stage(
+ let query_stage = create_shuffle_writer(
job_id,
self.next_stage_id(),
- merge.children()[0].clone(),
+ coalesce.children()[0].clone(),
+ None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
vec![query_stage.stage_id()],
@@ -121,35 +117,26 @@ impl DistributedPlanner {
query_stage.output_partitioning().partition_count(),
));
stages.push(query_stage);
- Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages))
- } else if let Some(agg) =
- execution_plan.as_any().downcast_ref::<HashAggregateExec>()
+ Ok((
+ coalesce.with_new_children(vec![unresolved_shuffle])?,
+ stages,
+ ))
+ } else if let Some(repart) =
+ execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
- //TODO should insert query stages in more generic way based on partitioning metadata
- // and not specifically for this operator
- match agg.mode() {
- AggregateMode::Final | AggregateMode::FinalPartitioned => {
- let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
- for child in &children {
- let new_stage = create_query_stage(
- job_id,
- self.next_stage_id(),
- child.clone(),
- )?;
- new_children.push(Arc::new(UnresolvedShuffleExec::new(
- vec![new_stage.stage_id()],
- new_stage.schema().clone(),
- new_stage.output_partitioning().partition_count(),
- )));
- stages.push(new_stage);
- }
- Ok((agg.with_new_children(new_children)?, stages))
- }
- AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)),
- }
- } else if let Some(join) = execution_plan.as_any().downcast_ref::<HashJoinExec>()
- {
- Ok((join.with_new_children(children)?, stages))
+ let query_stage = create_shuffle_writer(
+ job_id,
+ self.next_stage_id(),
+ repart.children()[0].clone(),
+ Some(repart.partitioning().to_owned()),
+ )?;
+ let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+ vec![query_stage.stage_id()],
+ query_stage.schema(),
+ query_stage.output_partitioning().partition_count(),
+ ));
+ stages.push(query_stage);
+ Ok((unresolved_shuffle, stages))
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
{
@@ -158,25 +145,7 @@ impl DistributedPlanner {
window
)))
} else {
- // TODO check for compatible partitioning schema, not just count
- if execution_plan.output_partitioning().partition_count()
- != children[0].output_partitioning().partition_count()
- {
- let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
- for child in &children {
- let new_stage =
- create_query_stage(job_id, self.next_stage_id(), child.clone())?;
- new_children.push(Arc::new(UnresolvedShuffleExec::new(
- vec![new_stage.stage_id()],
- new_stage.schema().clone(),
- new_stage.output_partitioning().partition_count(),
- )));
- stages.push(new_stage);
- }
- Ok((execution_plan.with_new_children(new_children)?, stages))
- } else {
- Ok((execution_plan.with_new_children(children)?, stages))
- }
+ Ok((execution_plan.with_new_children(children)?, stages))
}
}
@@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles(
Ok(stage.with_new_children(new_children)?)
}
-fn create_query_stage(
+fn create_shuffle_writer(
job_id: &str,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
+ partitioning: Option<Partitioning>,
) -> Result<Arc<ShuffleWriterExec>> {
Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
"".to_owned(), // executor will decide on the work_dir path
- None,
+ partitioning,
)?))
}
@@ -245,7 +215,7 @@ mod test {
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::UnresolvedShuffleExec;
use ballista_core::serde::protobuf;
- use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
+ use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
@@ -262,7 +232,7 @@ mod test {
}
#[test]
- fn test() -> Result<(), BallistaError> {
+ fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
let mut ctx = datafusion_test_context("testdata")?;
// simplified form of TPC-H query 1
@@ -285,41 +255,72 @@ mod test {
}
/* Expected result:
- ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
- HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
- CsvExec: testdata/lineitem; partitions=2
- ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
- CoalescePartitionsExec
- UnresolvedShuffleExec: stages=[1]
- ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
- SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
- ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
- HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
- UnresolvedShuffleExec: stages=[2]
+
+ ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2))
+ HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+ CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
+
+ ShuffleWriterExec: None
+ ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
+ HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+ CoalesceBatchesExec: target_batch_size=4096
+ RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2)
+ HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+ CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
+
+ ShuffleWriterExec: None
+ SortExec: [l_returnflag@0 ASC]
+ CoalescePartitionsExec
+ UnresolvedShuffleExec
*/
- let sort = stages[2].children()[0].clone();
- let sort = downcast_exec!(sort, SortExec);
+ assert_eq!(3, stages.len());
- let projection = sort.children()[0].clone();
- println!("{:?}", projection);
- let projection = downcast_exec!(projection, ProjectionExec);
+ // verify stage 0
+ let stage0 = stages[0].children()[0].clone();
+ let partial_hash = downcast_exec!(stage0, HashAggregateExec);
+ assert!(*partial_hash.mode() == AggregateMode::Partial);
+ // verify stage 1
+ let stage1 = stages[1].children()[0].clone();
+ let projection = downcast_exec!(stage1, ProjectionExec);
let final_hash = projection.children()[0].clone();
let final_hash = downcast_exec!(final_hash, HashAggregateExec);
-
- let unresolved_shuffle = final_hash.children()[0].clone();
+ assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
+
+ // verify stage 2
+ let stage2 = stages[2].children()[0].clone();
+ let sort = downcast_exec!(stage2, SortExec);
+ let coalesce_partitions = sort.children()[0].clone();
+ let coalesce_partitions =
+ downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
+ let unresolved_shuffle = coalesce_partitions.children()[0].clone();
let unresolved_shuffle =
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
- let merge_exec = stages[1].children()[0].clone();
- let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
+ Ok(())
+ }
- let unresolved_shuffle = merge_exec.children()[0].clone();
- let unresolved_shuffle =
- downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
- assert_eq!(unresolved_shuffle.query_stage_ids, vec![1]);
+ #[test]
+ fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+ let mut ctx = datafusion_test_context("testdata")?;
+
+ // simplified form of TPC-H query 1
+ let df = ctx.sql(
+ "select l_returnflag, sum(l_extendedprice * 1) as sum_disc_price
+ from lineitem
+ group by l_returnflag
+ order by l_returnflag",
+ )?;
+
+ let plan = df.to_logical_plan();
+ let plan = ctx.optimize(&plan)?;
+ let plan = ctx.create_physical_plan(&plan)?;
+
+ let mut planner = DistributedPlanner::new();
+ let job_uuid = Uuid::new_v4();
+ let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
let partial_hash = stages[0].children()[0].clone();
let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index becb95f..aa1e2b2 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -15,15 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::csv::CsvReadOptions;
pub const TPCH_TABLES: &[&str] = &[
@@ -31,16 +26,8 @@ pub const TPCH_TABLES: &[&str] = &[
];
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
- // remove Repartition rule because that isn't supported yet
- let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
- Arc::new(AddCoalescePartitionsExec::new()),
- Arc::new(CoalesceBatches::new()),
- ];
- let config = ExecutionConfig::new()
- .with_physical_optimizer_rules(rules)
- .with_repartition_aggregations(false);
+ let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
let mut ctx = ExecutionContext::with_config(config);
-
for table in TPCH_TABLES {
let schema = get_tpch_schema(table);
let options = CsvReadOptions::new()
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index 64cc0a1..5f07c17 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -167,7 +167,7 @@
//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
//! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec)
//! * Sort: [`SortExec`](physical_plan::sort::SortExec)
-//! * Merge (partitions): [`MergeExec`](physical_plan::merge::MergeExec)
+//! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
//! * Scan a CSV: [`CsvExec`](physical_plan::csv::CsvExec)
//! * Scan a Parquet: [`ParquetExec`](physical_plan::parquet::ParquetExec)
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
index 94ff230..4c04065 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -46,7 +46,7 @@ pub struct CoalescePartitionsExec {
}
impl CoalescePartitionsExec {
- /// Create a new MergeExec
+ /// Create a new CoalescePartitionsExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
CoalescePartitionsExec { input }
}
@@ -84,16 +84,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
match children.len() {
1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
_ => Err(DataFusionError::Internal(
- "MergeExec wrong number of children".to_string(),
+ "CoalescePartitionsExec wrong number of children".to_string(),
)),
}
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
- // MergeExec produces a single partition
+ // CoalescePartitionsExec produces a single partition
if 0 != partition {
return Err(DataFusionError::Internal(format!(
- "MergeExec invalid partition {}",
+ "CoalescePartitionsExec invalid partition {}",
partition
)));
}
@@ -101,7 +101,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
let input_partitions = self.input.output_partitioning().partition_count();
match input_partitions {
0 => Err(DataFusionError::Internal(
- "MergeExec requires at least one input partition".to_owned(),
+ "CoalescePartitionsExec requires at least one input partition".to_owned(),
)),
1 => {
// bypass any threading if there is a single partition
@@ -135,7 +135,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
- write!(f, "MergeExec")
+ write!(f, "CoalescePartitionsExec")
}
}
}
@@ -196,7 +196,7 @@ mod tests {
let merge = CoalescePartitionsExec::new(Arc::new(csv));
- // output of MergeExec should have a single partition
+ // output of CoalescePartitionsExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
// the result should contain 4 batches (one per input partition)
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index e157243..b4b7c22 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -74,7 +74,7 @@ use super::{
};
/// Hash aggregate modes
-#[derive(Debug, Copy, Clone)]
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
/// Partial aggregate that can be applied in parallel across input partitions
Partial,
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index 361e26e..9f47442 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -49,7 +49,7 @@ pub struct GlobalLimitExec {
}
impl GlobalLimitExec {
- /// Create a new MergeExec
+ /// Create a new GlobalLimitExec
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
GlobalLimitExec { input, limit }
}
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 307fff6..a940cbe 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -308,7 +308,7 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
_ => {
// merge into a single partition
let plan = CoalescePartitionsExec::new(plan.clone());
- // MergeExec must produce a single partition
+ // CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
}
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 5b43ec1..effdefc 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -736,7 +736,7 @@ impl DefaultPhysicalPlanner {
input
} else {
// Apply a LocalLimitExec to each partition. The optimizer will also insert
- // a MergeExec between the GlobalLimitExec and LocalLimitExec
+ // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
Arc::new(LocalLimitExec::new(input, limit))
};
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 82c12ce..bd73cb1 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -3812,7 +3812,7 @@ async fn test_physical_plan_display_indent() {
let expected = vec![
"GlobalLimitExec: limit=10",
" SortExec: [the_min@2 DESC]",
- " MergeExec",
+ " CoalescePartitionsExec",
" ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(c12), MIN(aggregate_test_100.c12)@2 as the_min]",
" HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(c12), MIN(c12)]",
" CoalesceBatchesExec: target_batch_size=4096",