You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/07/04 08:53:36 UTC

[arrow-datafusion] branch master updated: Ballista: Implement scalable distributed joins (#634)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 9314dbb  Ballista: Implement scalable distributed joins (#634)
9314dbb is described below

commit 9314dbb31a785e8f08bb5e65ac55f51592920b01
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sun Jul 4 02:53:27 2021 -0600

    Ballista: Implement scalable distributed joins (#634)
    
    * Refactor Ballista planner to support RepartitionExec
    
    * Improve tests and replace MergeExec with CoalescePartitionsExec in query plan output
---
 .../core/src/execution_plans/shuffle_writer.rs     |  20 ++-
 .../core/src/execution_plans/unresolved_shuffle.rs |  15 +-
 ballista/rust/core/src/utils.rs                    |  11 +-
 ballista/rust/scheduler/src/planner.rs             | 189 +++++++++++----------
 ballista/rust/scheduler/src/test_utils.rs          |  15 +-
 datafusion/src/lib.rs                              |   2 +-
 .../src/physical_plan/coalesce_partitions.rs       |  14 +-
 datafusion/src/physical_plan/hash_aggregate.rs     |   2 +-
 datafusion/src/physical_plan/limit.rs              |   2 +-
 datafusion/src/physical_plan/mod.rs                |   2 +-
 datafusion/src/physical_plan/planner.rs            |   2 +-
 datafusion/tests/sql.rs                            |   2 +-
 12 files changed, 143 insertions(+), 133 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 2d8d783..7fffaba 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::physical_plan::hash_join::create_hashes;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream};
+use datafusion::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+};
 use futures::StreamExt;
 use log::info;
 use std::fs::File;
@@ -307,6 +309,22 @@ impl ExecutionPlan for ShuffleWriterExec {
             )),
         }
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "ShuffleWriterExec: {:?}",
+                    self.shuffle_output_partitioning
+                )
+            }
+        }
+    }
 }
 
 fn result_schema() -> SchemaRef {
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 9c53bc7..49b4f7a 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -23,12 +23,13 @@ use crate::serde::scheduler::PartitionLocation;
 
 use async_trait::async_trait;
 use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
 use datafusion::{
     error::{DataFusionError, Result},
     physical_plan::RecordBatchStream,
 };
 use log::info;
+use std::fmt::Formatter;
 
 /// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet.
 ///
@@ -97,4 +98,16 @@ impl ExecutionPlan for UnresolvedShuffleExec {
             "Ballista UnresolvedShuffleExec does not support execution".to_owned(),
         ))
     }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "UnresolvedShuffleExec")
+            }
+        }
+    }
 }
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index d043763..8a510f4 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -227,16 +227,7 @@ fn build_exec_plan_diagram(
 
 /// Create a DataFusion context that is compatible with Ballista
 pub fn create_datafusion_context() -> ExecutionContext {
-    // remove Repartition rule because that isn't supported yet
-    let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
-        Arc::new(CoalesceBatches::new()),
-        Arc::new(AddCoalescePartitionsExec::new()),
-    ];
-    let config = ExecutionConfig::new()
-        .with_concurrency(1)
-        .with_repartition_joins(false)
-        .with_repartition_aggregations(false)
-        .with_physical_optimizer_rules(rules);
+    let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
     ExecutionContext::with_config(config)
 }
 
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 70d90a4..3195261 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -28,15 +28,11 @@ use ballista_core::{
     execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
     serde::scheduler::PartitionLocation,
 };
-use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::execution::context::ExecutionContext;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::windows::WindowAggExec;
-use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::{ExecutionPlan, Partitioning};
 use log::info;
 
 type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -71,13 +67,18 @@ impl DistributedPlanner {
         info!("planning query stages");
         let (new_plan, mut stages) =
             self.plan_query_stages_internal(job_id, execution_plan)?;
-        stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?);
+        stages.push(create_shuffle_writer(
+            job_id,
+            self.next_stage_id(),
+            new_plan,
+            None,
+        )?);
         Ok(stages)
     }
 
     /// Returns a potentially modified version of the input execution_plan along with the resulting query stages.
     /// This function is needed because the input execution_plan might need to be modified, but it might not hold a
-    /// compelte query stage (its parent might also belong to the same stage)
+    /// complete query stage (its parent might also belong to the same stage)
     fn plan_query_stages_internal(
         &mut self,
         job_id: &str,
@@ -98,22 +99,17 @@ impl DistributedPlanner {
         }
 
         if let Some(adapter) = execution_plan.as_any().downcast_ref::<DfTableAdapter>() {
-            // remove Repartition rule because that isn't supported yet
-            let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
-                Arc::new(CoalesceBatches::new()),
-                Arc::new(AddCoalescePartitionsExec::new()),
-            ];
-            let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
-            let ctx = ExecutionContext::with_config(config);
+            let ctx = ExecutionContext::new();
             Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
-        } else if let Some(merge) = execution_plan
+        } else if let Some(coalesce) = execution_plan
             .as_any()
             .downcast_ref::<CoalescePartitionsExec>()
         {
-            let query_stage = create_query_stage(
+            let query_stage = create_shuffle_writer(
                 job_id,
                 self.next_stage_id(),
-                merge.children()[0].clone(),
+                coalesce.children()[0].clone(),
+                None,
             )?;
             let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
                 vec![query_stage.stage_id()],
@@ -121,35 +117,26 @@ impl DistributedPlanner {
                 query_stage.output_partitioning().partition_count(),
             ));
             stages.push(query_stage);
-            Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages))
-        } else if let Some(agg) =
-            execution_plan.as_any().downcast_ref::<HashAggregateExec>()
+            Ok((
+                coalesce.with_new_children(vec![unresolved_shuffle])?,
+                stages,
+            ))
+        } else if let Some(repart) =
+            execution_plan.as_any().downcast_ref::<RepartitionExec>()
         {
-            //TODO should insert query stages in more generic way based on partitioning metadata
-            // and not specifically for this operator
-            match agg.mode() {
-                AggregateMode::Final | AggregateMode::FinalPartitioned => {
-                    let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
-                    for child in &children {
-                        let new_stage = create_query_stage(
-                            job_id,
-                            self.next_stage_id(),
-                            child.clone(),
-                        )?;
-                        new_children.push(Arc::new(UnresolvedShuffleExec::new(
-                            vec![new_stage.stage_id()],
-                            new_stage.schema().clone(),
-                            new_stage.output_partitioning().partition_count(),
-                        )));
-                        stages.push(new_stage);
-                    }
-                    Ok((agg.with_new_children(new_children)?, stages))
-                }
-                AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)),
-            }
-        } else if let Some(join) = execution_plan.as_any().downcast_ref::<HashJoinExec>()
-        {
-            Ok((join.with_new_children(children)?, stages))
+            let query_stage = create_shuffle_writer(
+                job_id,
+                self.next_stage_id(),
+                repart.children()[0].clone(),
+                Some(repart.partitioning().to_owned()),
+            )?;
+            let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+                vec![query_stage.stage_id()],
+                query_stage.schema(),
+                query_stage.output_partitioning().partition_count(),
+            ));
+            stages.push(query_stage);
+            Ok((unresolved_shuffle, stages))
         } else if let Some(window) =
             execution_plan.as_any().downcast_ref::<WindowAggExec>()
         {
@@ -158,25 +145,7 @@ impl DistributedPlanner {
                 window
             )))
         } else {
-            // TODO check for compatible partitioning schema, not just count
-            if execution_plan.output_partitioning().partition_count()
-                != children[0].output_partitioning().partition_count()
-            {
-                let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
-                for child in &children {
-                    let new_stage =
-                        create_query_stage(job_id, self.next_stage_id(), child.clone())?;
-                    new_children.push(Arc::new(UnresolvedShuffleExec::new(
-                        vec![new_stage.stage_id()],
-                        new_stage.schema().clone(),
-                        new_stage.output_partitioning().partition_count(),
-                    )));
-                    stages.push(new_stage);
-                }
-                Ok((execution_plan.with_new_children(new_children)?, stages))
-            } else {
-                Ok((execution_plan.with_new_children(children)?, stages))
-            }
+            Ok((execution_plan.with_new_children(children)?, stages))
         }
     }
 
@@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles(
     Ok(stage.with_new_children(new_children)?)
 }
 
-fn create_query_stage(
+fn create_shuffle_writer(
     job_id: &str,
     stage_id: usize,
     plan: Arc<dyn ExecutionPlan>,
+    partitioning: Option<Partitioning>,
 ) -> Result<Arc<ShuffleWriterExec>> {
     Ok(Arc::new(ShuffleWriterExec::try_new(
         job_id.to_owned(),
         stage_id,
         plan,
         "".to_owned(), // executor will decide on the work_dir path
-        None,
+        partitioning,
     )?))
 }
 
@@ -245,7 +215,7 @@ mod test {
     use ballista_core::error::BallistaError;
     use ballista_core::execution_plans::UnresolvedShuffleExec;
     use ballista_core::serde::protobuf;
-    use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
+    use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
     use datafusion::physical_plan::sort::SortExec;
     use datafusion::physical_plan::{
         coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
@@ -262,7 +232,7 @@ mod test {
     }
 
     #[test]
-    fn test() -> Result<(), BallistaError> {
+    fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
         let mut ctx = datafusion_test_context("testdata")?;
 
         // simplified form of TPC-H query 1
@@ -285,41 +255,72 @@ mod test {
         }
 
         /* Expected result:
-        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
-         HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
-          CsvExec: testdata/lineitem; partitions=2
-        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
-         CoalescePartitionsExec
-          UnresolvedShuffleExec: stages=[1]
-        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
-         SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
-          ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
-           HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
-            UnresolvedShuffleExec: stages=[2]
+
+        ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2))
+          HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+            CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
+
+        ShuffleWriterExec: None
+          ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price]
+            HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+              CoalesceBatchesExec: target_batch_size=4096
+                RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2)
+                  HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))]
+                    CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false
+
+        ShuffleWriterExec: None
+          SortExec: [l_returnflag@0 ASC]
+            CoalescePartitionsExec
+              UnresolvedShuffleExec
         */
 
-        let sort = stages[2].children()[0].clone();
-        let sort = downcast_exec!(sort, SortExec);
+        assert_eq!(3, stages.len());
 
-        let projection = sort.children()[0].clone();
-        println!("{:?}", projection);
-        let projection = downcast_exec!(projection, ProjectionExec);
+        // verify stage 0
+        let stage0 = stages[0].children()[0].clone();
+        let partial_hash = downcast_exec!(stage0, HashAggregateExec);
+        assert!(*partial_hash.mode() == AggregateMode::Partial);
 
+        // verify stage 1
+        let stage1 = stages[1].children()[0].clone();
+        let projection = downcast_exec!(stage1, ProjectionExec);
         let final_hash = projection.children()[0].clone();
         let final_hash = downcast_exec!(final_hash, HashAggregateExec);
-
-        let unresolved_shuffle = final_hash.children()[0].clone();
+        assert!(*final_hash.mode() == AggregateMode::FinalPartitioned);
+
+        // verify stage 2
+        let stage2 = stages[2].children()[0].clone();
+        let sort = downcast_exec!(stage2, SortExec);
+        let coalesce_partitions = sort.children()[0].clone();
+        let coalesce_partitions =
+            downcast_exec!(coalesce_partitions, CoalescePartitionsExec);
+        let unresolved_shuffle = coalesce_partitions.children()[0].clone();
         let unresolved_shuffle =
             downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
         assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
 
-        let merge_exec = stages[1].children()[0].clone();
-        let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
+        Ok(())
+    }
 
-        let unresolved_shuffle = merge_exec.children()[0].clone();
-        let unresolved_shuffle =
-            downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
-        assert_eq!(unresolved_shuffle.query_stage_ids, vec![1]);
+    #[test]
+    fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> {
+        let mut ctx = datafusion_test_context("testdata")?;
+
+        // simplified form of TPC-H query 1
+        let df = ctx.sql(
+            "select l_returnflag, sum(l_extendedprice * 1) as sum_disc_price
+            from lineitem
+            group by l_returnflag
+            order by l_returnflag",
+        )?;
+
+        let plan = df.to_logical_plan();
+        let plan = ctx.optimize(&plan)?;
+        let plan = ctx.create_physical_plan(&plan)?;
+
+        let mut planner = DistributedPlanner::new();
+        let job_uuid = Uuid::new_v4();
+        let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
 
         let partial_hash = stages[0].children()[0].clone();
         let partial_hash_serde = roundtrip_operator(partial_hash.clone())?;
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index becb95f..aa1e2b2 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -15,15 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
 use ballista_core::error::Result;
 
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
-use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
-use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::csv::CsvReadOptions;
 
 pub const TPCH_TABLES: &[&str] = &[
@@ -31,16 +26,8 @@ pub const TPCH_TABLES: &[&str] = &[
 ];
 
 pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
-    // remove Repartition rule because that isn't supported yet
-    let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
-        Arc::new(AddCoalescePartitionsExec::new()),
-        Arc::new(CoalesceBatches::new()),
-    ];
-    let config = ExecutionConfig::new()
-        .with_physical_optimizer_rules(rules)
-        .with_repartition_aggregations(false);
+    let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
     let mut ctx = ExecutionContext::with_config(config);
-
     for table in TPCH_TABLES {
         let schema = get_tpch_schema(table);
         let options = CsvReadOptions::new()
diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs
index 64cc0a1..5f07c17 100644
--- a/datafusion/src/lib.rs
+++ b/datafusion/src/lib.rs
@@ -167,7 +167,7 @@
 //! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
 //! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec)
 //! * Sort: [`SortExec`](physical_plan::sort::SortExec)
-//! * Merge (partitions): [`MergeExec`](physical_plan::merge::MergeExec)
+//! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
 //! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
 //! * Scan a CSV: [`CsvExec`](physical_plan::csv::CsvExec)
 //! * Scan a Parquet: [`ParquetExec`](physical_plan::parquet::ParquetExec)
diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
index 94ff230..4c04065 100644
--- a/datafusion/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -46,7 +46,7 @@ pub struct CoalescePartitionsExec {
 }
 
 impl CoalescePartitionsExec {
-    /// Create a new MergeExec
+    /// Create a new CoalescePartitionsExec
     pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
         CoalescePartitionsExec { input }
     }
@@ -84,16 +84,16 @@ impl ExecutionPlan for CoalescePartitionsExec {
         match children.len() {
             1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
             _ => Err(DataFusionError::Internal(
-                "MergeExec wrong number of children".to_string(),
+                "CoalescePartitionsExec wrong number of children".to_string(),
             )),
         }
     }
 
     async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
-        // MergeExec produces a single partition
+        // CoalescePartitionsExec produces a single partition
         if 0 != partition {
             return Err(DataFusionError::Internal(format!(
-                "MergeExec invalid partition {}",
+                "CoalescePartitionsExec invalid partition {}",
                 partition
             )));
         }
@@ -101,7 +101,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
         let input_partitions = self.input.output_partitioning().partition_count();
         match input_partitions {
             0 => Err(DataFusionError::Internal(
-                "MergeExec requires at least one input partition".to_owned(),
+                "CoalescePartitionsExec requires at least one input partition".to_owned(),
             )),
             1 => {
                 // bypass any threading if there is a single partition
@@ -135,7 +135,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
     ) -> std::fmt::Result {
         match t {
             DisplayFormatType::Default => {
-                write!(f, "MergeExec")
+                write!(f, "CoalescePartitionsExec")
             }
         }
     }
@@ -196,7 +196,7 @@ mod tests {
 
         let merge = CoalescePartitionsExec::new(Arc::new(csv));
 
-        // output of MergeExec should have a single partition
+        // output of CoalescePartitionsExec should have a single partition
         assert_eq!(merge.output_partitioning().partition_count(), 1);
 
         // the result should contain 4 batches (one per input partition)
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index e157243..b4b7c22 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -74,7 +74,7 @@ use super::{
 };
 
 /// Hash aggregate modes
-#[derive(Debug, Copy, Clone)]
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
 pub enum AggregateMode {
     /// Partial aggregate that can be applied in parallel across input partitions
     Partial,
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index 361e26e..9f47442 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -49,7 +49,7 @@ pub struct GlobalLimitExec {
 }
 
 impl GlobalLimitExec {
-    /// Create a new MergeExec
+    /// Create a new GlobalLimitExec
     pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize) -> Self {
         GlobalLimitExec { input, limit }
     }
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 307fff6..a940cbe 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -308,7 +308,7 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
         _ => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(plan.clone());
-            // MergeExec must produce a single partition
+            // CoalescePartitionsExec must produce a single partition
             assert_eq!(1, plan.output_partitioning().partition_count());
             common::collect(plan.execute(0).await?).await
         }
diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs
index 5b43ec1..effdefc 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -736,7 +736,7 @@ impl DefaultPhysicalPlanner {
                     input
                 } else {
                     // Apply a LocalLimitExec to each partition. The optimizer will also insert
-                    // a MergeExec between the GlobalLimitExec and LocalLimitExec
+                    // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec
                     Arc::new(LocalLimitExec::new(input, limit))
                 };
 
diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs
index 82c12ce..bd73cb1 100644
--- a/datafusion/tests/sql.rs
+++ b/datafusion/tests/sql.rs
@@ -3812,7 +3812,7 @@ async fn test_physical_plan_display_indent() {
     let expected = vec![
         "GlobalLimitExec: limit=10",
         "  SortExec: [the_min@2 DESC]",
-        "    MergeExec",
+        "    CoalescePartitionsExec",
         "      ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(c12), MIN(aggregate_test_100.c12)@2 as the_min]",
         "        HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(c12), MIN(c12)]",
         "          CoalesceBatchesExec: target_batch_size=4096",