You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/06/28 08:05:11 UTC

[arrow-datafusion] branch master updated: Rename MergeExec to CoalescePartitionsExec (#635)

This is an automated email from the ASF dual-hosted git repository.

jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4068f8b  Rename MergeExec to CoalescePartitionsExec (#635)
4068f8b is described below

commit 4068f8b3a212aff8d7cdf2183fd1834be0dc5e69
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jun 28 02:05:00 2021 -0600

    Rename MergeExec to CoalescePartitionsExec (#635)
---
 ballista/rust/core/proto/ballista.proto               |  4 ++--
 .../rust/core/src/serde/physical_plan/from_proto.rs   |  4 ++--
 .../rust/core/src/serde/physical_plan/to_proto.rs     |  6 +++---
 ballista/rust/core/src/utils.rs                       | 14 +++++++++-----
 ballista/rust/scheduler/src/planner.rs                | 19 ++++++++++++-------
 ballista/rust/scheduler/src/test_utils.rs             |  4 ++--
 datafusion/src/execution/context.rs                   |  4 ++--
 datafusion/src/physical_optimizer/merge_exec.rs       | 18 +++++++++---------
 .../{merge.rs => coalesce_partitions.rs}              | 12 ++++++------
 datafusion/src/physical_plan/cross_join.rs            |  6 ++++--
 datafusion/src/physical_plan/hash_aggregate.rs        |  4 ++--
 datafusion/src/physical_plan/hash_join.rs             |  4 ++--
 datafusion/src/physical_plan/limit.rs                 |  5 +++--
 datafusion/src/physical_plan/mod.rs                   |  8 +++++---
 datafusion/src/physical_plan/sort.rs                  |  4 ++--
 datafusion/src/physical_plan/sort_preserving_merge.rs |  4 ++--
 16 files changed, 67 insertions(+), 53 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 365d8e9..2aa6102 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -414,7 +414,7 @@ message PhysicalPlanNode {
     SortExecNode sort = 11;
     CoalesceBatchesExecNode coalesce_batches = 12;
     FilterExecNode filter = 13;
-    MergeExecNode merge = 14;
+    CoalescePartitionsExecNode merge = 14;
     UnresolvedShuffleExecNode unresolved = 15;
     RepartitionExecNode repartition = 16;
     WindowAggExecNode window = 17;
@@ -648,7 +648,7 @@ message CoalesceBatchesExecNode {
   uint32 target_batch_size = 2;
 }
 
-message MergeExecNode {
+message CoalescePartitionsExecNode {
   PhysicalPlanNode input = 1;
 }
 
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 4b87be4..83cbdb4 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -37,9 +37,9 @@ use datafusion::execution::context::{
 };
 use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
 use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use datafusion::physical_plan::hash_join::PartitionMode;
-use datafusion::physical_plan::merge::MergeExec;
 use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
 use datafusion::physical_plan::window_functions::{
     BuiltInWindowFunction, WindowFunction,
@@ -147,7 +147,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
             }
             PhysicalPlanType::Merge(merge) => {
                 let input: Arc<dyn ExecutionPlan> = convert_box_required!(merge.input)?;
-                Ok(Arc::new(MergeExec::new(input)))
+                Ok(Arc::new(CoalescePartitionsExec::new(input)))
             }
             PhysicalPlanType::Repartition(repart) => {
                 let input: Arc<dyn ExecutionPlan> = convert_box_required!(repart.input)?;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index cf5401b..306abc1 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -59,8 +59,8 @@ use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
 use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
 use crate::serde::scheduler::PartitionLocation;
 use crate::serde::{protobuf, BallistaError};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
-use datafusion::physical_plan::merge::MergeExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 
 impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
@@ -292,11 +292,11 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
                     },
                 )),
             })
-        } else if let Some(exec) = plan.downcast_ref::<MergeExec>() {
+        } else if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
             let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
             Ok(protobuf::PhysicalPlanNode {
                 physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
-                    protobuf::MergeExecNode {
+                    protobuf::CoalescePartitionsExecNode {
                         input: Some(Box::new(input)),
                     },
                 ))),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index b58be28..26bdb00 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -40,15 +40,15 @@ use datafusion::arrow::{
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 use datafusion::logical_plan::Operator;
 use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::csv::CsvExec;
 use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
 use datafusion::physical_plan::hash_join::HashJoinExec;
-use datafusion::physical_plan::merge::MergeExec;
 use datafusion::physical_plan::parquet::ParquetExec;
 use datafusion::physical_plan::projection::ProjectionExec;
 use datafusion::physical_plan::sort::SortExec;
@@ -177,8 +177,12 @@ fn build_exec_plan_diagram(
         .is_some()
     {
         "CoalesceBatchesExec"
-    } else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
-        "MergeExec"
+    } else if plan
+        .as_any()
+        .downcast_ref::<CoalescePartitionsExec>()
+        .is_some()
+    {
+        "CoalescePartitionsExec"
     } else {
         println!("Unknown: {:?}", plan);
         "Unknown"
@@ -226,7 +230,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
     // remove Repartition rule because that isn't supported yet
     let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
         Arc::new(CoalesceBatches::new()),
-        Arc::new(AddMergeExec::new()),
+        Arc::new(AddCoalescePartitionsExec::new()),
     ];
     let config = ExecutionConfig::new()
         .with_concurrency(1)
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 2ac9f61..32fc9a9 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -30,11 +30,11 @@ use ballista_core::{
 };
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
 use datafusion::physical_plan::hash_join::HashJoinExec;
-use datafusion::physical_plan::merge::MergeExec;
 use datafusion::physical_plan::windows::WindowAggExec;
 use datafusion::physical_plan::ExecutionPlan;
 use log::info;
@@ -101,12 +101,15 @@ impl DistributedPlanner {
             // remove Repartition rule because that isn't supported yet
             let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
                 Arc::new(CoalesceBatches::new()),
-                Arc::new(AddMergeExec::new()),
+                Arc::new(AddCoalescePartitionsExec::new()),
             ];
             let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
             let ctx = ExecutionContext::with_config(config);
             Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
-        } else if let Some(merge) = execution_plan.as_any().downcast_ref::<MergeExec>() {
+        } else if let Some(merge) = execution_plan
+            .as_any()
+            .downcast_ref::<CoalescePartitionsExec>()
+        {
             let query_stage = create_query_stage(
                 job_id,
                 self.next_stage_id(),
@@ -244,8 +247,10 @@ mod test {
     use ballista_core::serde::protobuf;
     use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
     use datafusion::physical_plan::sort::SortExec;
+    use datafusion::physical_plan::{
+        coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
+    };
     use datafusion::physical_plan::{displayable, ExecutionPlan};
-    use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
     use std::convert::TryInto;
     use std::sync::Arc;
     use uuid::Uuid;
@@ -284,7 +289,7 @@ mod test {
          HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
           CsvExec: testdata/lineitem; partitions=2
         QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
-         MergeExec
+         CoalescePartitionsExec
           UnresolvedShuffleExec: stages=[1]
         QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
          SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
@@ -309,7 +314,7 @@ mod test {
         assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
 
         let merge_exec = stages[1].children()[0].clone();
-        let merge_exec = downcast_exec!(merge_exec, MergeExec);
+        let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
 
         let unresolved_shuffle = merge_exec.children()[0].clone();
         let unresolved_shuffle =
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index 311f9a7..becb95f 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -22,7 +22,7 @@ use ballista_core::error::Result;
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::csv::CsvReadOptions;
 
@@ -33,7 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[
 pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
     // remove Repartition rule because that isn't supported yet
     let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
-        Arc::new(AddMergeExec::new()),
+        Arc::new(AddCoalescePartitionsExec::new()),
         Arc::new(CoalesceBatches::new()),
     ];
     let config = ExecutionConfig::new()
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 8ce408d..17625c9 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -61,7 +61,7 @@ use crate::optimizer::optimizer::OptimizerRule;
 use crate::optimizer::projection_push_down::ProjectionPushDown;
 use crate::optimizer::simplify_expressions::SimplifyExpressions;
 use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
-use crate::physical_optimizer::merge_exec::AddMergeExec;
+use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
 use crate::physical_optimizer::repartition::Repartition;
 
 use crate::physical_plan::csv::CsvReadOptions;
@@ -643,7 +643,7 @@ impl Default for ExecutionConfig {
             physical_optimizers: vec![
                 Arc::new(CoalesceBatches::new()),
                 Arc::new(Repartition::new()),
-                Arc::new(AddMergeExec::new()),
+                Arc::new(AddCoalescePartitionsExec::new()),
             ],
             query_planner: Arc::new(DefaultQueryPlanner {}),
             default_catalog: "datafusion".to_owned(),
diff --git a/datafusion/src/physical_optimizer/merge_exec.rs b/datafusion/src/physical_optimizer/merge_exec.rs
index 877c0be..0127313 100644
--- a/datafusion/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/src/physical_optimizer/merge_exec.rs
@@ -15,27 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! AddMergeExec adds MergeExec to merge plans
-//! with more partitions into one partition when the node
-//! needs a single partition
+//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
+//! with more than one partition, to coalesce them into one partition
+//! when the node needs a single partition
 use super::optimizer::PhysicalOptimizerRule;
 use crate::{
     error::Result,
-    physical_plan::{merge::MergeExec, Distribution},
+    physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
 };
 use std::sync::Arc;
 
-/// Introduces MergeExec
-pub struct AddMergeExec {}
+/// Introduces CoalescePartitionsExec
+pub struct AddCoalescePartitionsExec {}
 
-impl AddMergeExec {
+impl AddCoalescePartitionsExec {
     #[allow(missing_docs)]
     pub fn new() -> Self {
         Self {}
     }
 }
 
-impl PhysicalOptimizerRule for AddMergeExec {
+impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
     fn optimize(
         &self,
         plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
@@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
                             if child.output_partitioning().partition_count() == 1 {
                                 child.clone()
                             } else {
-                                Arc::new(MergeExec::new(child.clone()))
+                                Arc::new(CoalescePartitionsExec::new(child.clone()))
                             }
                         })
                         .collect(),
diff --git a/datafusion/src/physical_plan/merge.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
similarity index 95%
rename from datafusion/src/physical_plan/merge.rs
rename to datafusion/src/physical_plan/coalesce_partitions.rs
index a25f5c7..94ff230 100644
--- a/datafusion/src/physical_plan/merge.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -40,15 +40,15 @@ use pin_project_lite::pin_project;
 /// Merge execution plan executes partitions in parallel and combines them into a single
 /// partition. No guarantees are made about the order of the resulting partition.
 #[derive(Debug)]
-pub struct MergeExec {
+pub struct CoalescePartitionsExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
 }
 
-impl MergeExec {
+impl CoalescePartitionsExec {
     /// Create a new MergeExec
     pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
-        MergeExec { input }
+        CoalescePartitionsExec { input }
     }
 
     /// Input execution plan
@@ -58,7 +58,7 @@ impl MergeExec {
 }
 
 #[async_trait]
-impl ExecutionPlan for MergeExec {
+impl ExecutionPlan for CoalescePartitionsExec {
     /// Return a reference to Any that can be used for downcasting
     fn as_any(&self) -> &dyn Any {
         self
@@ -82,7 +82,7 @@ impl ExecutionPlan for MergeExec {
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         match children.len() {
-            1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
+            1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
             _ => Err(DataFusionError::Internal(
                 "MergeExec wrong number of children".to_string(),
             )),
@@ -194,7 +194,7 @@ mod tests {
         // input should have 4 partitions
         assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
 
-        let merge = MergeExec::new(Arc::new(csv));
+        let merge = CoalescePartitionsExec::new(Arc::new(csv));
 
         // output of MergeExec should have a single partition
         assert_eq!(merge.output_partitioning().partition_count(), 1);
diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs
index f6f5da4..98ad344 100644
--- a/datafusion/src/physical_plan/cross_join.rs
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -27,7 +27,9 @@ use arrow::record_batch::RecordBatch;
 
 use futures::{Stream, TryStreamExt};
 
-use super::{hash_utils::check_join_is_valid, merge::MergeExec};
+use super::{
+    coalesce_partitions::CoalescePartitionsExec, hash_utils::check_join_is_valid,
+};
 use crate::{
     error::{DataFusionError, Result},
     scalar::ScalarValue,
@@ -144,7 +146,7 @@ impl ExecutionPlan for CrossJoinExec {
                     let start = Instant::now();
 
                     // merge all left parts into a single stream
-                    let merge = MergeExec::new(self.left.clone());
+                    let merge = CoalescePartitionsExec::new(self.left.clone());
                     let stream = merge.execute(0).await?;
 
                     // Load all batches and count the rows
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 250ba2b..e157243 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -1230,7 +1230,7 @@ mod tests {
     use crate::physical_plan::expressions::{col, Avg};
     use crate::{assert_batches_sorted_eq, physical_plan::common};
 
-    use crate::physical_plan::merge::MergeExec;
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 
     /// some mock data to aggregates
     fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
@@ -1298,7 +1298,7 @@ mod tests {
         ];
         assert_batches_sorted_eq!(expected, &result);
 
-        let merge = Arc::new(MergeExec::new(partial_aggregate));
+        let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
 
         let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
             .map(|i| col(&groups[i].1, &input_schema))
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index ad35607..eb5ceaf 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -54,8 +54,8 @@ use arrow::array::{
 
 use super::expressions::Column;
 use super::{
+    coalesce_partitions::CoalescePartitionsExec,
     hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
-    merge::MergeExec,
 };
 use crate::error::{DataFusionError, Result};
 
@@ -260,7 +260,7 @@ impl ExecutionPlan for HashJoinExec {
                             let start = Instant::now();
 
                             // merge all left parts into a single stream
-                            let merge = MergeExec::new(self.left.clone());
+                            let merge = CoalescePartitionsExec::new(self.left.clone());
                             let stream = merge.execute(0).await?;
 
                             // This operation performs 2 steps at once:
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index c56dbe1..361e26e 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -295,9 +295,9 @@ mod tests {
     use common::collect;
 
     use super::*;
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::common;
     use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
-    use crate::physical_plan::merge::MergeExec;
     use crate::test;
 
     #[tokio::test]
@@ -319,7 +319,8 @@ mod tests {
         // input should have 4 partitions
         assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
 
-        let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7);
+        let limit =
+            GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
 
         // the result should contain 4 batches (one per input partition)
         let iter = limit.execute(0).await?;
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 7f9f7ea..2122751 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,7 +17,9 @@
 
 //! Traits for physical query plan, supporting parallel execution for partitioned relations.
 
-use self::{display::DisplayableExecutionPlan, merge::MergeExec};
+use self::{
+    coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
+};
 use crate::execution::context::ExecutionContextState;
 use crate::logical_plan::LogicalPlan;
 use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -315,7 +317,7 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
         }
         _ => {
             // merge into a single partition
-            let plan = MergeExec::new(plan.clone());
+            let plan = CoalescePartitionsExec::new(plan.clone());
             // MergeExec must produce a single partition
             assert_eq!(1, plan.output_partitioning().partition_count());
             common::collect(plan.execute(0).await?).await
@@ -592,6 +594,7 @@ pub trait Accumulator: Send + Sync + Debug {
 pub mod aggregates;
 pub mod array_expressions;
 pub mod coalesce_batches;
+pub mod coalesce_partitions;
 pub mod common;
 pub mod cross_join;
 #[cfg(feature = "crypto_expressions")]
@@ -613,7 +616,6 @@ pub mod json;
 pub mod limit;
 pub mod math_expressions;
 pub mod memory;
-pub mod merge;
 pub mod parquet;
 pub mod planner;
 pub mod projection;
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 3650978..faaa10d 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -315,9 +315,9 @@ impl RecordBatchStream for SortStream {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::memory::MemoryExec;
-    use crate::physical_plan::merge::MergeExec;
     use crate::physical_plan::{
         collect,
         csv::{CsvExec, CsvReadOptions},
@@ -357,7 +357,7 @@ mod tests {
                     options: SortOptions::default(),
                 },
             ],
-            Arc::new(MergeExec::new(Arc::new(csv))),
+            Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
         )?);
 
         let result: Vec<RecordBatch> = collect(sort_exec).await?;
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs
index b8ca97c..316f050 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -542,10 +542,10 @@ mod tests {
     use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
     use crate::assert_batches_eq;
     use crate::datasource::CsvReadOptions;
+    use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
     use crate::physical_plan::csv::CsvExec;
     use crate::physical_plan::expressions::col;
     use crate::physical_plan::memory::MemoryExec;
-    use crate::physical_plan::merge::MergeExec;
     use crate::physical_plan::sort::SortExec;
     use crate::physical_plan::{collect, common};
     use crate::test;
@@ -639,7 +639,7 @@ mod tests {
         src: Arc<dyn ExecutionPlan>,
         sort: Vec<PhysicalSortExpr>,
     ) -> RecordBatch {
-        let merge = Arc::new(MergeExec::new(src));
+        let merge = Arc::new(CoalescePartitionsExec::new(src));
         let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
         let mut result = collect(sort_exec).await.unwrap();
         assert_eq!(result.len(), 1);