You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 19:38:26 UTC
[arrow-datafusion] branch master updated: Implement fast path of with_new_children() in ExecutionPlan (#2168)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fa9e01641 Implement fast path of with_new_children() in ExecutionPlan (#2168)
fa9e01641 is described below
commit fa9e0164127c233a16b53e1d12f162f3b00171f5
Author: mingmwang <mi...@ebay.com>
AuthorDate: Fri Apr 8 03:38:22 2022 +0800
Implement fast path of with_new_children() in ExecutionPlan (#2168)
* Implement fast path of with_new_children() in ExecutionPlan
* resolve review comments
* refine comments
---
.../core/src/execution_plans/distributed_query.rs | 2 +-
.../core/src/execution_plans/shuffle_reader.rs | 2 +-
.../core/src/execution_plans/shuffle_writer.rs | 3 +-
.../core/src/execution_plans/unresolved_shuffle.rs | 2 +-
ballista/rust/core/src/serde/mod.rs | 15 ++----
ballista/rust/executor/src/collect.rs | 2 +-
ballista/rust/scheduler/src/planner.rs | 25 ++++++----
.../rust/scheduler/src/scheduler_server/mod.rs | 2 +-
.../src/scheduler_server/query_stage_scheduler.rs | 3 +-
datafusion-examples/examples/custom_datasource.rs | 15 ++----
.../src/physical_optimizer/coalesce_batches.rs | 3 +-
.../core/src/physical_optimizer/merge_exec.rs | 12 +++--
.../core/src/physical_optimizer/repartition.rs | 6 ++-
datafusion/core/src/physical_optimizer/utils.rs | 4 +-
datafusion/core/src/physical_plan/analyze.rs | 19 +++-----
.../core/src/physical_plan/coalesce_batches.rs | 17 +++----
.../core/src/physical_plan/coalesce_partitions.rs | 9 +---
datafusion/core/src/physical_plan/cross_join.rs | 20 +++-----
datafusion/core/src/physical_plan/empty.rs | 29 +++++------
datafusion/core/src/physical_plan/explain.rs | 13 ++---
.../core/src/physical_plan/file_format/avro.rs | 13 ++---
.../core/src/physical_plan/file_format/csv.rs | 13 ++---
.../core/src/physical_plan/file_format/json.rs | 13 ++---
.../core/src/physical_plan/file_format/parquet.rs | 13 ++---
datafusion/core/src/physical_plan/filter.rs | 57 +++++++++++++++++-----
.../core/src/physical_plan/hash_aggregate.rs | 26 +++++-----
datafusion/core/src/physical_plan/hash_join.rs | 23 ++++-----
datafusion/core/src/physical_plan/limit.rs | 17 +++----
datafusion/core/src/physical_plan/memory.rs | 5 +-
datafusion/core/src/physical_plan/mod.rs | 29 ++++++++++-
datafusion/core/src/physical_plan/planner.rs | 2 +-
datafusion/core/src/physical_plan/projection.rs | 17 +++----
datafusion/core/src/physical_plan/repartition.rs | 15 ++----
datafusion/core/src/physical_plan/sorts/sort.rs | 15 ++----
.../physical_plan/sorts/sort_preserving_merge.rs | 15 ++----
datafusion/core/src/physical_plan/union.rs | 2 +-
datafusion/core/src/physical_plan/values.rs | 17 +++----
.../src/physical_plan/windows/window_agg_exec.rs | 19 +++-----
datafusion/core/src/test/exec.rs | 26 ++++------
datafusion/core/tests/custom_sources.rs | 17 ++-----
datafusion/core/tests/provider_filter_pushdown.rs | 2 +-
datafusion/core/tests/statistics.rs | 14 ++----
datafusion/core/tests/user_defined_plan.rs | 15 ++----
43 files changed, 258 insertions(+), 330 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 46a168f17..b0d3bef1f 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -148,7 +148,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(DistributedQueryExec {
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index aeabc72a5..b0aa6af11 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -95,7 +95,7 @@ impl ExecutionPlan for ShuffleReaderExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Plan(
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 530f123d9..77190c5ab 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -342,10 +342,9 @@ impl ExecutionPlan for ShuffleWriterExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- assert!(children.len() == 1);
Ok(Arc::new(ShuffleWriterExec::try_new(
self.job_id.clone(),
self.stage_id,
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 83d4598f5..e1eecdd92 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -92,7 +92,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Plan(
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 2cd02b9f9..ed41ce61c 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -505,18 +505,13 @@ mod tests {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(TopKExec {
- input: children[0].clone(),
- k: self.k,
- })),
- _ => Err(DataFusionError::Internal(
- "TopKExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(TopKExec {
+ input: children[0].clone(),
+ k: self.k,
+ }))
}
/// Execute one partition and return an iterator over RecordBatch
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index 50215f6c9..1bb4acaf8 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -72,7 +72,7 @@ impl ExecutionPlan for CollectExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index d7ee22bbd..8198c4ed2 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -30,7 +30,9 @@ use ballista_core::{
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::windows::WindowAggExec;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{
+ with_new_children_if_necessary, ExecutionPlan, Partitioning,
+};
use futures::future::BoxFuture;
use futures::FutureExt;
use log::info;
@@ -99,7 +101,7 @@ impl DistributedPlanner {
stages.append(&mut child_stages);
}
- if let Some(coalesce) = execution_plan
+ if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
@@ -122,7 +124,10 @@ impl DistributedPlanner {
));
stages.push(shuffle_writer);
Ok((
- coalesce.with_new_children(vec![unresolved_shuffle])?,
+ with_new_children_if_necessary(
+ execution_plan,
+ vec![unresolved_shuffle],
+ )?,
stages,
))
} else if let Some(repart) =
@@ -163,7 +168,10 @@ impl DistributedPlanner {
window
)))
} else {
- Ok((execution_plan.with_new_children(children)?, stages))
+ Ok((
+ with_new_children_if_necessary(execution_plan, children)?,
+ stages,
+ ))
}
}
.boxed()
@@ -197,7 +205,7 @@ pub fn find_unresolved_shuffles(
}
pub fn remove_unresolved_shuffles(
- stage: &dyn ExecutionPlan,
+ stage: Arc<dyn ExecutionPlan>,
partition_locations: &HashMap<usize, HashMap<usize, Vec<PartitionLocation>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
@@ -240,13 +248,10 @@ pub fn remove_unresolved_shuffles(
unresolved_shuffle.schema().clone(),
)?))
} else {
- new_children.push(remove_unresolved_shuffles(
- child.as_ref(),
- partition_locations,
- )?);
+ new_children.push(remove_unresolved_shuffles(child, partition_locations)?);
}
}
- Ok(stage.with_new_children(new_children)?)
+ Ok(with_new_children_if_necessary(stage, new_children)?)
}
fn create_shuffle_writer(
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 2f6220f33..4b47e5239 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -241,7 +241,7 @@ impl Default for SessionContextRegistry {
}
impl SessionContextRegistry {
- /// Create the registry that object stores can registered into.
+ /// Create the registry that session contexts can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
Self {
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 4b22ed16a..58e9f8f3b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -304,8 +304,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
}
}
- let plan =
- remove_unresolved_shuffles(stage_plan.as_ref(), &partition_locations)?;
+ let plan = remove_unresolved_shuffles(stage_plan, &partition_locations)?;
self.state.save_stage_plan(job_id, stage_id, plan).await?;
}
diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index 3d725c4c6..a4b9fda1a 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -21,7 +21,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
-use datafusion::error::{DataFusionError, Result};
+use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
@@ -219,17 +219,10 @@ impl ExecutionPlan for CustomExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 4c9edb452..50f8abe7f 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -19,6 +19,7 @@
//! in bigger batches to avoid overhead with small batches
use super::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::with_new_children_if_necessary;
use crate::{
error::Result,
physical_plan::{
@@ -69,7 +70,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// leaf node, children cannot be replaced
Ok(plan.clone())
} else {
- let plan = plan.with_new_children(children)?;
+ let plan = with_new_children_if_necessary(plan, children)?;
Ok(if wrap_in_coalesce {
// TODO we should add specific configuration settings for coalescing batches and
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
index f23da15e5..f614673f5 100644
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/core/src/physical_optimizer/merge_exec.rs
@@ -19,6 +19,7 @@
//! with more than one partition, to coalesce them into one partition
//! when the node needs a single partition
use super::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::with_new_children_if_necessary;
use crate::{
error::Result,
physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
@@ -52,9 +53,14 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
.map(|child| self.optimize(child.clone(), config))
.collect::<Result<Vec<_>>>()?;
match plan.required_child_distribution() {
- Distribution::UnspecifiedDistribution => plan.with_new_children(children),
- Distribution::HashPartitioned(_) => plan.with_new_children(children),
- Distribution::SinglePartition => plan.with_new_children(
+ Distribution::UnspecifiedDistribution => {
+ with_new_children_if_necessary(plan, children)
+ }
+ Distribution::HashPartitioned(_) => {
+ with_new_children_if_necessary(plan, children)
+ }
+ Distribution::SinglePartition => with_new_children_if_necessary(
+ plan,
children
.iter()
.map(|child| {
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index d98fa4162..2506348fe 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -20,7 +20,9 @@ use std::sync::Arc;
use super::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::Partitioning::*;
-use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan};
+use crate::physical_plan::{
+ repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
+};
use crate::{error::Result, execution::context::SessionConfig};
/// Optimizer that introduces repartition to introduce more
@@ -191,7 +193,7 @@ fn optimize_partitions(
)
})
.collect::<Result<_>>()?;
- plan.with_new_children(children)?
+ with_new_children_if_necessary(plan, children)?
};
// decide if we should bother trying to repartition the output of this plan
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index bb1415e18..4aceb776d 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -21,7 +21,7 @@ use super::optimizer::PhysicalOptimizerRule;
use crate::execution::context::SessionConfig;
use crate::error::Result;
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use std::sync::Arc;
/// Convenience rule for writing optimizers: recursively invoke
@@ -42,6 +42,6 @@ pub fn optimize_children(
if children.is_empty() {
Ok(Arc::clone(&plan))
} else {
- plan.with_new_children(children)
+ with_new_children_if_necessary(plan, children)
}
}
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 31f47a673..f8050f16c 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -92,21 +92,14 @@ impl ExecutionPlan for AnalyzeExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.len() == 1 {
- Ok(Arc::new(Self::new(
- self.verbose,
- children.pop().unwrap(),
- self.schema.clone(),
- )))
- } else {
- Err(DataFusionError::Internal(format!(
- "Invalid child count for AnalyzeExec. Expected 1 got {}",
- children.len()
- )))
- }
+ Ok(Arc::new(Self::new(
+ self.verbose,
+ children.pop().unwrap(),
+ self.schema.clone(),
+ )))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 18785354c..482b0ee77 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -23,7 +23,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
@@ -107,18 +107,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(CoalesceBatchesExec::new(
- children[0].clone(),
- self.target_batch_size,
- ))),
- _ => Err(DataFusionError::Internal(
- "CoalesceBatchesExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(CoalesceBatchesExec::new(
+ children[0].clone(),
+ self.target_batch_size,
+ )))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index b2832f73c..3ecbd61f2 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -96,15 +96,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
- _ => Err(DataFusionError::Internal(
- "CoalescePartitionsExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone())))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/cross_join.rs
index efe822421..43555f077 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/cross_join.rs
@@ -32,10 +32,7 @@ use super::{
coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid,
ColumnStatistics, Statistics,
};
-use crate::{
- error::{DataFusionError, Result},
- scalar::ScalarValue,
-};
+use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use std::time::Instant;
@@ -120,18 +117,13 @@ impl ExecutionPlan for CrossJoinExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 2 => Ok(Arc::new(CrossJoinExec::try_new(
- children[0].clone(),
- children[1].clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "CrossJoinExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(CrossJoinExec::try_new(
+ children[0].clone(),
+ children[1].clone(),
+ )?))
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 0fbf18861..42ed368e1 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -104,18 +104,13 @@ impl ExecutionPlan for EmptyExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 0 => Ok(Arc::new(EmptyExec::new(
- self.produce_one_row,
- self.schema.clone(),
- ))),
- _ => Err(DataFusionError::Internal(
- "EmptyExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(EmptyExec::new(
+ self.produce_one_row,
+ self.schema.clone(),
+ )))
}
async fn execute(
@@ -161,6 +156,7 @@ impl ExecutionPlan for EmptyExec {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::physical_plan::with_new_children_if_necessary;
use crate::prelude::SessionContext;
use crate::{physical_plan::common, test_util};
@@ -184,18 +180,19 @@ mod tests {
#[test]
fn with_new_children() -> Result<()> {
let schema = test_util::aggr_test_schema();
- let empty = EmptyExec::new(false, schema.clone());
- let empty_with_row = EmptyExec::new(true, schema);
+ let empty = Arc::new(EmptyExec::new(false, schema.clone()));
+ let empty_with_row = Arc::new(EmptyExec::new(true, schema));
- let empty2 = empty.with_new_children(vec![])?;
+ let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?;
assert_eq!(empty.schema(), empty2.schema());
- let empty_with_row_2 = empty_with_row.with_new_children(vec![])?;
+ let empty_with_row_2 =
+ with_new_children_if_necessary(empty_with_row.clone(), vec![])?;
assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
let too_many_kids = vec![empty2];
assert!(
- empty.with_new_children(too_many_kids).is_err(),
+ with_new_children_if_necessary(empty, too_many_kids).is_err(),
"expected error when providing list of kids"
);
Ok(())
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index fd5ff03ca..4905e9e17 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -103,17 +103,10 @@ impl ExecutionPlan for ExplainExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 3f6b85777..68f8f2f90 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -88,17 +88,10 @@ impl ExecutionPlan for AvroExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
#[cfg(not(feature = "avro"))]
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9791b2e00..8aea607ea 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -106,17 +106,10 @@ impl ExecutionPlan for CsvExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a6e7840e7..ef9d498a0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -84,17 +84,10 @@ impl ExecutionPlan for NdJsonExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()) as Arc<dyn ExecutionPlan>)
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d6800af9f..a5146360e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -192,17 +192,10 @@ impl ExecutionPlan for ParquetExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(format!(
- "Children cannot be replaced in {:?}",
- self
- )))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 0822aa4ff..ff86d0893 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -119,18 +119,13 @@ impl ExecutionPlan for FilterExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(FilterExec::try_new(
- self.predicate.clone(),
- children[0].clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "FilterExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(FilterExec::try_new(
+ self.predicate.clone(),
+ children[0].clone(),
+ )?))
}
async fn execute(
@@ -242,10 +237,10 @@ mod tests {
use super::*;
use crate::datafusion_data_access::object_store::local::LocalFileSystem;
- use crate::physical_plan::collect;
use crate::physical_plan::expressions::*;
use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
+ use crate::physical_plan::{collect, with_new_children_if_necessary};
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test;
@@ -307,4 +302,44 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ #[allow(clippy::vtable_address_comparisons)]
+ async fn with_new_children() -> Result<()> {
+ let schema = test_util::aggr_test_schema();
+ let partitions = 4;
+ let (_, files) =
+ test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
+ let input = Arc::new(CsvExec::new(
+ FileScanConfig {
+ object_store: Arc::new(LocalFileSystem {}),
+ file_schema: Arc::clone(&schema),
+ file_groups: files,
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ },
+ true,
+ b',',
+ ));
+
+ let predicate: Arc<dyn PhysicalExpr> = binary(
+ col("c2", &schema)?,
+ Operator::Gt,
+ lit(ScalarValue::from(1u32)),
+ &schema,
+ )?;
+
+ let filter: Arc<dyn ExecutionPlan> =
+ Arc::new(FilterExec::try_new(predicate, input.clone())?);
+
+ let new_filter = filter.clone().with_new_children(vec![input.clone()])?;
+ assert!(!Arc::ptr_eq(&filter, &new_filter));
+
+ let new_filter2 = with_new_children_if_necessary(filter.clone(), vec![input])?;
+ assert!(Arc::ptr_eq(&filter, &new_filter2));
+
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_plan/hash_aggregate.rs b/datafusion/core/src/physical_plan/hash_aggregate.rs
index 7476f77ed..75d68a6bc 100644
--- a/datafusion/core/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/core/src/physical_plan/hash_aggregate.rs
@@ -28,7 +28,7 @@ use futures::{
Future,
};
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use crate::physical_plan::hash_utils::create_hashes;
use crate::physical_plan::{
Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
@@ -261,21 +261,16 @@ impl ExecutionPlan for HashAggregateExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(HashAggregateExec::try_new(
- self.mode,
- self.group_expr.clone(),
- self.aggr_expr.clone(),
- children[0].clone(),
- self.input_schema.clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "HashAggregateExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(HashAggregateExec::try_new(
+ self.mode,
+ self.group_expr.clone(),
+ self.aggr_expr.clone(),
+ children[0].clone(),
+ self.input_schema.clone(),
+ )?))
}
fn metrics(&self) -> Option<MetricsSet> {
@@ -1027,6 +1022,7 @@ mod tests {
use crate::{assert_batches_sorted_eq, physical_plan::common};
use arrow::array::{Float64Array, UInt32Array};
use arrow::datatypes::DataType;
+ use datafusion_common::DataFusionError;
use futures::FutureExt;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -1178,7 +1174,7 @@ mod tests {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index ec7e032f4..44b8cc97d 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -257,22 +257,17 @@ impl ExecutionPlan for HashJoinExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 2 => Ok(Arc::new(HashJoinExec::try_new(
- children[0].clone(),
- children[1].clone(),
- self.on.clone(),
- &self.join_type,
- self.mode,
- &self.null_equals_null,
- )?)),
- _ => Err(DataFusionError::Internal(
- "HashJoinExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(HashJoinExec::try_new(
+ children[0].clone(),
+ children[1].clone(),
+ self.on.clone(),
+ &self.join_type,
+ self.mode,
+ &self.null_equals_null,
+ )?))
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 5aa2ab6ff..1cf99b3f5 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -117,18 +117,13 @@ impl ExecutionPlan for GlobalLimitExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(GlobalLimitExec::new(
- children[0].clone(),
- self.limit,
- ))),
- _ => Err(DataFusionError::Internal(
- "GlobalLimitExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(GlobalLimitExec::new(
+ children[0].clone(),
+ self.limit,
+ )))
}
async fn execute(
@@ -268,7 +263,7 @@ impl ExecutionPlan for LocalLimitExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 2662c551b..2862aefdb 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -27,13 +27,14 @@ use super::{
common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use crate::execution::context::TaskContext;
use async_trait::async_trait;
+use datafusion_common::DataFusionError;
use futures::Stream;
/// Execution plan for reading in-memory batches of data
@@ -87,7 +88,7 @@ impl ExecutionPlan for MemoryExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index f14ba3fdc..9d45ece61 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -37,6 +37,7 @@ use futures::stream::Stream;
use std::fmt;
use std::fmt::Debug;
+use datafusion_common::DataFusionError;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, pin::Pin};
@@ -216,9 +217,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
/// Returns a new plan where all children were replaced by new plans.
- /// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
@@ -258,6 +258,31 @@ pub trait ExecutionPlan: Debug + Send + Sync {
fn statistics(&self) -> Statistics;
}
+/// Returns a copy of this plan if we change any child according to the pointer comparison.
+/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
+/// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even
+/// in the case of 'false-native'.
+#[allow(clippy::vtable_address_comparisons)]
+pub fn with_new_children_if_necessary(
+ plan: Arc<dyn ExecutionPlan>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ if children.len() != plan.children().len() {
+ Err(DataFusionError::Internal(
+ "Wrong number of children".to_string(),
+ ))
+ } else if children.is_empty()
+ || children
+ .iter()
+ .zip(plan.children().iter())
+ .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
+ {
+ plan.with_new_children(children)
+ } else {
+ Ok(plan)
+ }
+}
+
/// Return a [wrapper](DisplayableExecutionPlan) around an
/// [`ExecutionPlan`] which can be displayed in various easier to
/// understand ways.
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d6d40a740..1e6c13f9c 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1927,7 +1927,7 @@ mod tests {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!("NoOpExecutionPlan::with_new_children");
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 4419b6f91..a49d00669 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -26,7 +26,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
};
@@ -136,18 +136,13 @@ impl ExecutionPlan for ProjectionExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(ProjectionExec::try_new(
- self.expr.clone(),
- children[0].clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "ProjectionExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(ProjectionExec::try_new(
+ self.expr.clone(),
+ children[0].clone(),
+ )?))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 18cfb4cd3..18e87e09d 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -152,18 +152,13 @@ impl ExecutionPlan for RepartitionExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(RepartitionExec::try_new(
- children[0].clone(),
- self.partitioning.clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "RepartitionExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(RepartitionExec::try_new(
+ children[0].clone(),
+ self.partitioning.clone(),
+ )?))
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index f48beb34b..163a608b3 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -716,18 +716,13 @@ impl ExecutionPlan for SortExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(SortExec::try_new(
- self.expr.clone(),
- children[0].clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "SortExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(SortExec::try_new(
+ self.expr.clone(),
+ children[0].clone(),
+ )?))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 3fb480de2..4b0fccd26 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -141,18 +141,13 @@ impl ExecutionPlan for SortPreservingMergeExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(SortPreservingMergeExec::new(
- self.expr.clone(),
- children[0].clone(),
- ))),
- _ => Err(DataFusionError::Internal(
- "SortPreservingMergeExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(SortPreservingMergeExec::new(
+ self.expr.clone(),
+ children[0].clone(),
+ )))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index f156a60fe..8c730ea0e 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -100,7 +100,7 @@ impl ExecutionPlan for UnionExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(UnionExec::new(children)))
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index 8cc448df5..f2ba681ed 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -129,18 +129,13 @@ impl ExecutionPlan for ValuesExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 0 => Ok(Arc::new(ValuesExec {
- schema: self.schema.clone(),
- data: self.data.clone(),
- })),
- _ => Err(DataFusionError::Internal(
- "ValuesExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(ValuesExec {
+ schema: self.schema.clone(),
+ data: self.data.clone(),
+ }))
}
async fn execute(
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index f59bc910b..553b6f26b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -17,7 +17,7 @@
//! Stream and channel implementations for window function expressions.
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -140,19 +140,14 @@ impl ExecutionPlan for WindowAggExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(WindowAggExec::try_new(
- self.window_expr.clone(),
- children[0].clone(),
- self.input_schema.clone(),
- )?)),
- _ => Err(DataFusionError::Internal(
- "WindowAggExec wrong number of children".to_owned(),
- )),
- }
+ Ok(Arc::new(WindowAggExec::try_new(
+ self.window_expr.clone(),
+ children[0].clone(),
+ self.input_schema.clone(),
+ )?))
}
async fn execute(
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 41d3c55ea..7e0cbe35f 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -161,8 +161,8 @@ impl ExecutionPlan for MockExec {
}
fn with_new_children(
- &self,
- _children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
@@ -300,8 +300,8 @@ impl ExecutionPlan for BarrierExec {
}
fn with_new_children(
- &self,
- _children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
@@ -401,8 +401,8 @@ impl ExecutionPlan for ErrorExec {
}
fn with_new_children(
- &self,
- _children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
@@ -481,16 +481,10 @@ impl ExecutionPlan for StatisticsExec {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(
- "Children cannot be replaced in CustomExecutionPlan".to_owned(),
- ))
- }
+ Ok(self)
}
async fn execute(
@@ -582,7 +576,7 @@ impl ExecutionPlan for BlockingExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index 957bcf504..0916e966c 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -25,10 +25,7 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::scalar::ScalarValue;
use datafusion::{datasource::TableProvider, physical_plan::collect};
-use datafusion::{
- error::{DataFusionError, Result},
- physical_plan::DisplayFormatType,
-};
+use datafusion::{error::Result, physical_plan::DisplayFormatType};
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::logical_plan::{
@@ -120,16 +117,10 @@ impl ExecutionPlan for CustomExecutionPlan {
vec![]
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(
- "Children cannot be replaced in CustomExecutionPlan".to_owned(),
- ))
- }
+ Ok(self)
}
async fn execute(
&self,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index e722d4cee..cfd903991 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -78,7 +78,7 @@ impl ExecutionPlan for CustomPlan {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unreachable!()
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index 9f5a1aab0..1170dc7cb 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -22,7 +22,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
datasource::TableProvider,
- error::{DataFusionError, Result},
+ error::Result,
logical_plan::Expr,
physical_plan::{
expressions::PhysicalSortExpr, project_schema, ColumnStatistics,
@@ -129,16 +129,10 @@ impl ExecutionPlan for StatisticsValidation {
}
fn with_new_children(
- &self,
- children: Vec<Arc<dyn ExecutionPlan>>,
+ self: Arc<Self>,
+ _: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if children.is_empty() {
- Ok(Arc::new(self.clone()))
- } else {
- Err(DataFusionError::Internal(
- "Children cannot be replaced in CustomExecutionPlan".to_owned(),
- ))
- }
+ Ok(self)
}
async fn execute(
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 86da4da85..43e6eeacd 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -447,18 +447,13 @@ impl ExecutionPlan for TopKExec {
}
fn with_new_children(
- &self,
+ self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
- match children.len() {
- 1 => Ok(Arc::new(TopKExec {
- input: children[0].clone(),
- k: self.k,
- })),
- _ => Err(DataFusionError::Internal(
- "TopKExec wrong number of children".to_string(),
- )),
- }
+ Ok(Arc::new(TopKExec {
+ input: children[0].clone(),
+ k: self.k,
+ }))
}
/// Execute one partition and return an iterator over RecordBatch