You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/06/28 08:05:11 UTC
[arrow-datafusion] branch master updated: Rename MergeExec to
CoalescePartitionsExec (#635)
This is an automated email from the ASF dual-hosted git repository.
jorgecarleitao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4068f8b Rename MergeExec to CoalescePartitionsExec (#635)
4068f8b is described below
commit 4068f8b3a212aff8d7cdf2183fd1834be0dc5e69
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon Jun 28 02:05:00 2021 -0600
Rename MergeExec to CoalescePartitionsExec (#635)
---
ballista/rust/core/proto/ballista.proto | 4 ++--
.../rust/core/src/serde/physical_plan/from_proto.rs | 4 ++--
.../rust/core/src/serde/physical_plan/to_proto.rs | 6 +++---
ballista/rust/core/src/utils.rs | 14 +++++++++-----
ballista/rust/scheduler/src/planner.rs | 19 ++++++++++++-------
ballista/rust/scheduler/src/test_utils.rs | 4 ++--
datafusion/src/execution/context.rs | 4 ++--
datafusion/src/physical_optimizer/merge_exec.rs | 18 +++++++++---------
.../{merge.rs => coalesce_partitions.rs} | 12 ++++++------
datafusion/src/physical_plan/cross_join.rs | 6 ++++--
datafusion/src/physical_plan/hash_aggregate.rs | 4 ++--
datafusion/src/physical_plan/hash_join.rs | 4 ++--
datafusion/src/physical_plan/limit.rs | 5 +++--
datafusion/src/physical_plan/mod.rs | 8 +++++---
datafusion/src/physical_plan/sort.rs | 4 ++--
datafusion/src/physical_plan/sort_preserving_merge.rs | 4 ++--
16 files changed, 67 insertions(+), 53 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index 365d8e9..2aa6102 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -414,7 +414,7 @@ message PhysicalPlanNode {
SortExecNode sort = 11;
CoalesceBatchesExecNode coalesce_batches = 12;
FilterExecNode filter = 13;
- MergeExecNode merge = 14;
+ CoalescePartitionsExecNode merge = 14;
UnresolvedShuffleExecNode unresolved = 15;
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
@@ -648,7 +648,7 @@ message CoalesceBatchesExecNode {
uint32 target_batch_size = 2;
}
-message MergeExecNode {
+message CoalescePartitionsExecNode {
PhysicalPlanNode input = 1;
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 4b87be4..83cbdb4 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -37,9 +37,9 @@ use datafusion::execution::context::{
};
use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
-use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
@@ -147,7 +147,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::Merge(merge) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(merge.input)?;
- Ok(Arc::new(MergeExec::new(input)))
+ Ok(Arc::new(CoalescePartitionsExec::new(input)))
}
PhysicalPlanType::Repartition(repart) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(repart.input)?;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index cf5401b..306abc1 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -59,8 +59,8 @@ use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
-use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::repartition::RepartitionExec;
impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
@@ -292,11 +292,11 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
)),
})
- } else if let Some(exec) = plan.downcast_ref::<MergeExec>() {
+ } else if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
- protobuf::MergeExecNode {
+ protobuf::CoalescePartitionsExecNode {
input: Some(Box::new(input)),
},
))),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index b58be28..26bdb00 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -40,15 +40,15 @@ use datafusion::arrow::{
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::logical_plan::Operator;
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
-use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
@@ -177,8 +177,12 @@ fn build_exec_plan_diagram(
.is_some()
{
"CoalesceBatchesExec"
- } else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
- "MergeExec"
+ } else if plan
+ .as_any()
+ .downcast_ref::<CoalescePartitionsExec>()
+ .is_some()
+ {
+ "CoalescePartitionsExec"
} else {
println!("Unknown: {:?}", plan);
"Unknown"
@@ -226,7 +230,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
- Arc::new(AddMergeExec::new()),
+ Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new()
.with_concurrency(1)
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 2ac9f61..32fc9a9 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -30,11 +30,11 @@ use ballista_core::{
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
-use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::ExecutionPlan;
use log::info;
@@ -101,12 +101,15 @@ impl DistributedPlanner {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
- Arc::new(AddMergeExec::new()),
+ Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let ctx = ExecutionContext::with_config(config);
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
- } else if let Some(merge) = execution_plan.as_any().downcast_ref::<MergeExec>() {
+ } else if let Some(merge) = execution_plan
+ .as_any()
+ .downcast_ref::<CoalescePartitionsExec>()
+ {
let query_stage = create_query_stage(
job_id,
self.next_stage_id(),
@@ -244,8 +247,10 @@ mod test {
use ballista_core::serde::protobuf;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::sort::SortExec;
+ use datafusion::physical_plan::{
+ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
+ };
use datafusion::physical_plan::{displayable, ExecutionPlan};
- use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
use uuid::Uuid;
@@ -284,7 +289,7 @@ mod test {
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
- MergeExec
+ CoalescePartitionsExec
UnresolvedShuffleExec: stages=[1]
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
@@ -309,7 +314,7 @@ mod test {
assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
let merge_exec = stages[1].children()[0].clone();
- let merge_exec = downcast_exec!(merge_exec, MergeExec);
+ let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
let unresolved_shuffle = merge_exec.children()[0].clone();
let unresolved_shuffle =
diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs
index 311f9a7..becb95f 100644
--- a/ballista/rust/scheduler/src/test_utils.rs
+++ b/ballista/rust/scheduler/src/test_utils.rs
@@ -22,7 +22,7 @@ use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
-use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::csv::CsvReadOptions;
@@ -33,7 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
- Arc::new(AddMergeExec::new()),
+ Arc::new(AddCoalescePartitionsExec::new()),
Arc::new(CoalesceBatches::new()),
];
let config = ExecutionConfig::new()
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index 8ce408d..17625c9 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -61,7 +61,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
-use crate::physical_optimizer::merge_exec::AddMergeExec;
+use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;
use crate::physical_plan::csv::CsvReadOptions;
@@ -643,7 +643,7 @@ impl Default for ExecutionConfig {
physical_optimizers: vec![
Arc::new(CoalesceBatches::new()),
Arc::new(Repartition::new()),
- Arc::new(AddMergeExec::new()),
+ Arc::new(AddCoalescePartitionsExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
default_catalog: "datafusion".to_owned(),
diff --git a/datafusion/src/physical_optimizer/merge_exec.rs b/datafusion/src/physical_optimizer/merge_exec.rs
index 877c0be..0127313 100644
--- a/datafusion/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/src/physical_optimizer/merge_exec.rs
@@ -15,27 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-//! AddMergeExec adds MergeExec to merge plans
-//! with more partitions into one partition when the node
-//! needs a single partition
+//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
+//! with more than one partition, to coalesce them into one partition
+//! when the node needs a single partition
use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
- physical_plan::{merge::MergeExec, Distribution},
+ physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
};
use std::sync::Arc;
-/// Introduces MergeExec
-pub struct AddMergeExec {}
+/// Introduces CoalescePartitionsExec
+pub struct AddCoalescePartitionsExec {}
-impl AddMergeExec {
+impl AddCoalescePartitionsExec {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}
-impl PhysicalOptimizerRule for AddMergeExec {
+impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
@@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
- Arc::new(MergeExec::new(child.clone()))
+ Arc::new(CoalescePartitionsExec::new(child.clone()))
}
})
.collect(),
diff --git a/datafusion/src/physical_plan/merge.rs b/datafusion/src/physical_plan/coalesce_partitions.rs
similarity index 95%
rename from datafusion/src/physical_plan/merge.rs
rename to datafusion/src/physical_plan/coalesce_partitions.rs
index a25f5c7..94ff230 100644
--- a/datafusion/src/physical_plan/merge.rs
+++ b/datafusion/src/physical_plan/coalesce_partitions.rs
@@ -40,15 +40,15 @@ use pin_project_lite::pin_project;
/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
#[derive(Debug)]
-pub struct MergeExec {
+pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
}
-impl MergeExec {
+impl CoalescePartitionsExec {
/// Create a new MergeExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
- MergeExec { input }
+ CoalescePartitionsExec { input }
}
/// Input execution plan
@@ -58,7 +58,7 @@ impl MergeExec {
}
#[async_trait]
-impl ExecutionPlan for MergeExec {
+impl ExecutionPlan for CoalescePartitionsExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
@@ -82,7 +82,7 @@ impl ExecutionPlan for MergeExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
- 1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
+ 1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
_ => Err(DataFusionError::Internal(
"MergeExec wrong number of children".to_string(),
)),
@@ -194,7 +194,7 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
- let merge = MergeExec::new(Arc::new(csv));
+ let merge = CoalescePartitionsExec::new(Arc::new(csv));
// output of MergeExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs
index f6f5da4..98ad344 100644
--- a/datafusion/src/physical_plan/cross_join.rs
+++ b/datafusion/src/physical_plan/cross_join.rs
@@ -27,7 +27,9 @@ use arrow::record_batch::RecordBatch;
use futures::{Stream, TryStreamExt};
-use super::{hash_utils::check_join_is_valid, merge::MergeExec};
+use super::{
+ coalesce_partitions::CoalescePartitionsExec, hash_utils::check_join_is_valid,
+};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
@@ -144,7 +146,7 @@ impl ExecutionPlan for CrossJoinExec {
let start = Instant::now();
// merge all left parts into a single stream
- let merge = MergeExec::new(self.left.clone());
+ let merge = CoalescePartitionsExec::new(self.left.clone());
let stream = merge.execute(0).await?;
// Load all batches and count the rows
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 250ba2b..e157243 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -1230,7 +1230,7 @@ mod tests {
use crate::physical_plan::expressions::{col, Avg};
use crate::{assert_batches_sorted_eq, physical_plan::common};
- use crate::physical_plan::merge::MergeExec;
+ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
/// some mock data to aggregates
fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
@@ -1298,7 +1298,7 @@ mod tests {
];
assert_batches_sorted_eq!(expected, &result);
- let merge = Arc::new(MergeExec::new(partial_aggregate));
+ let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
.map(|i| col(&groups[i].1, &input_schema))
diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs
index ad35607..eb5ceaf 100644
--- a/datafusion/src/physical_plan/hash_join.rs
+++ b/datafusion/src/physical_plan/hash_join.rs
@@ -54,8 +54,8 @@ use arrow::array::{
use super::expressions::Column;
use super::{
+ coalesce_partitions::CoalescePartitionsExec,
hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
- merge::MergeExec,
};
use crate::error::{DataFusionError, Result};
@@ -260,7 +260,7 @@ impl ExecutionPlan for HashJoinExec {
let start = Instant::now();
// merge all left parts into a single stream
- let merge = MergeExec::new(self.left.clone());
+ let merge = CoalescePartitionsExec::new(self.left.clone());
let stream = merge.execute(0).await?;
// This operation performs 2 steps at once:
diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs
index c56dbe1..361e26e 100644
--- a/datafusion/src/physical_plan/limit.rs
+++ b/datafusion/src/physical_plan/limit.rs
@@ -295,9 +295,9 @@ mod tests {
use common::collect;
use super::*;
+ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
- use crate::physical_plan::merge::MergeExec;
use crate::test;
#[tokio::test]
@@ -319,7 +319,8 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
- let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7);
+ let limit =
+ GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0).await?;
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 7f9f7ea..2122751 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -17,7 +17,9 @@
//! Traits for physical query plan, supporting parallel execution for partitioned relations.
-use self::{display::DisplayableExecutionPlan, merge::MergeExec};
+use self::{
+ coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan,
+};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -315,7 +317,7 @@ pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
}
_ => {
// merge into a single partition
- let plan = MergeExec::new(plan.clone());
+ let plan = CoalescePartitionsExec::new(plan.clone());
// MergeExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
@@ -592,6 +594,7 @@ pub trait Accumulator: Send + Sync + Debug {
pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
+pub mod coalesce_partitions;
pub mod common;
pub mod cross_join;
#[cfg(feature = "crypto_expressions")]
@@ -613,7 +616,6 @@ pub mod json;
pub mod limit;
pub mod math_expressions;
pub mod memory;
-pub mod merge;
pub mod parquet;
pub mod planner;
pub mod projection;
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index 3650978..faaa10d 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -315,9 +315,9 @@ impl RecordBatchStream for SortStream {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
- use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::{
collect,
csv::{CsvExec, CsvReadOptions},
@@ -357,7 +357,7 @@ mod tests {
options: SortOptions::default(),
},
],
- Arc::new(MergeExec::new(Arc::new(csv))),
+ Arc::new(CoalescePartitionsExec::new(Arc::new(csv))),
)?);
let result: Vec<RecordBatch> = collect(sort_exec).await?;
diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs
index b8ca97c..316f050 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -542,10 +542,10 @@ mod tests {
use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
use crate::assert_batches_eq;
use crate::datasource::CsvReadOptions;
+ use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::csv::CsvExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
- use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::sort::SortExec;
use crate::physical_plan::{collect, common};
use crate::test;
@@ -639,7 +639,7 @@ mod tests {
src: Arc<dyn ExecutionPlan>,
sort: Vec<PhysicalSortExpr>,
) -> RecordBatch {
- let merge = Arc::new(MergeExec::new(src));
+ let merge = Arc::new(CoalescePartitionsExec::new(src));
let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap());
let mut result = collect(sort_exec).await.unwrap();
assert_eq!(result.len(), 1);