You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 19:38:26 UTC

[arrow-datafusion] branch master updated: Implement fast path of with_new_children() in ExecutionPlan (#2168)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fa9e01641 Implement fast path of with_new_children() in ExecutionPlan (#2168)
fa9e01641 is described below

commit fa9e0164127c233a16b53e1d12f162f3b00171f5
Author: mingmwang <mi...@ebay.com>
AuthorDate: Fri Apr 8 03:38:22 2022 +0800

    Implement fast path of with_new_children() in ExecutionPlan (#2168)
    
    * Implement fast path of with_new_children() in ExecutionPlan
    
    * resolve review comments
    
    * refine comments
---
 .../core/src/execution_plans/distributed_query.rs  |  2 +-
 .../core/src/execution_plans/shuffle_reader.rs     |  2 +-
 .../core/src/execution_plans/shuffle_writer.rs     |  3 +-
 .../core/src/execution_plans/unresolved_shuffle.rs |  2 +-
 ballista/rust/core/src/serde/mod.rs                | 15 ++----
 ballista/rust/executor/src/collect.rs              |  2 +-
 ballista/rust/scheduler/src/planner.rs             | 25 ++++++----
 .../rust/scheduler/src/scheduler_server/mod.rs     |  2 +-
 .../src/scheduler_server/query_stage_scheduler.rs  |  3 +-
 datafusion-examples/examples/custom_datasource.rs  | 15 ++----
 .../src/physical_optimizer/coalesce_batches.rs     |  3 +-
 .../core/src/physical_optimizer/merge_exec.rs      | 12 +++--
 .../core/src/physical_optimizer/repartition.rs     |  6 ++-
 datafusion/core/src/physical_optimizer/utils.rs    |  4 +-
 datafusion/core/src/physical_plan/analyze.rs       | 19 +++-----
 .../core/src/physical_plan/coalesce_batches.rs     | 17 +++----
 .../core/src/physical_plan/coalesce_partitions.rs  |  9 +---
 datafusion/core/src/physical_plan/cross_join.rs    | 20 +++-----
 datafusion/core/src/physical_plan/empty.rs         | 29 +++++------
 datafusion/core/src/physical_plan/explain.rs       | 13 ++---
 .../core/src/physical_plan/file_format/avro.rs     | 13 ++---
 .../core/src/physical_plan/file_format/csv.rs      | 13 ++---
 .../core/src/physical_plan/file_format/json.rs     | 13 ++---
 .../core/src/physical_plan/file_format/parquet.rs  | 13 ++---
 datafusion/core/src/physical_plan/filter.rs        | 57 +++++++++++++++++-----
 .../core/src/physical_plan/hash_aggregate.rs       | 26 +++++-----
 datafusion/core/src/physical_plan/hash_join.rs     | 23 ++++-----
 datafusion/core/src/physical_plan/limit.rs         | 17 +++----
 datafusion/core/src/physical_plan/memory.rs        |  5 +-
 datafusion/core/src/physical_plan/mod.rs           | 29 ++++++++++-
 datafusion/core/src/physical_plan/planner.rs       |  2 +-
 datafusion/core/src/physical_plan/projection.rs    | 17 +++----
 datafusion/core/src/physical_plan/repartition.rs   | 15 ++----
 datafusion/core/src/physical_plan/sorts/sort.rs    | 15 ++----
 .../physical_plan/sorts/sort_preserving_merge.rs   | 15 ++----
 datafusion/core/src/physical_plan/union.rs         |  2 +-
 datafusion/core/src/physical_plan/values.rs        | 17 +++----
 .../src/physical_plan/windows/window_agg_exec.rs   | 19 +++-----
 datafusion/core/src/test/exec.rs                   | 26 ++++------
 datafusion/core/tests/custom_sources.rs            | 17 ++-----
 datafusion/core/tests/provider_filter_pushdown.rs  |  2 +-
 datafusion/core/tests/statistics.rs                | 14 ++----
 datafusion/core/tests/user_defined_plan.rs         | 15 ++----
 43 files changed, 258 insertions(+), 330 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/distributed_query.rs b/ballista/rust/core/src/execution_plans/distributed_query.rs
index 46a168f17..b0d3bef1f 100644
--- a/ballista/rust/core/src/execution_plans/distributed_query.rs
+++ b/ballista/rust/core/src/execution_plans/distributed_query.rs
@@ -148,7 +148,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(DistributedQueryExec {
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index aeabc72a5..b0aa6af11 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -95,7 +95,7 @@ impl ExecutionPlan for ShuffleReaderExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Err(DataFusionError::Plan(
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 530f123d9..77190c5ab 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -342,10 +342,9 @@ impl ExecutionPlan for ShuffleWriterExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        assert!(children.len() == 1);
         Ok(Arc::new(ShuffleWriterExec::try_new(
             self.job_id.clone(),
             self.stage_id,
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 83d4598f5..e1eecdd92 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -92,7 +92,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Err(DataFusionError::Plan(
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index 2cd02b9f9..ed41ce61c 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -505,18 +505,13 @@ mod tests {
         }
 
         fn with_new_children(
-            &self,
+            self: Arc<Self>,
             children: Vec<Arc<dyn ExecutionPlan>>,
         ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
-            match children.len() {
-                1 => Ok(Arc::new(TopKExec {
-                    input: children[0].clone(),
-                    k: self.k,
-                })),
-                _ => Err(DataFusionError::Internal(
-                    "TopKExec wrong number of children".to_string(),
-                )),
-            }
+            Ok(Arc::new(TopKExec {
+                input: children[0].clone(),
+                k: self.k,
+            }))
         }
 
         /// Execute one partition and return an iterator over RecordBatch
diff --git a/ballista/rust/executor/src/collect.rs b/ballista/rust/executor/src/collect.rs
index 50215f6c9..1bb4acaf8 100644
--- a/ballista/rust/executor/src/collect.rs
+++ b/ballista/rust/executor/src/collect.rs
@@ -72,7 +72,7 @@ impl ExecutionPlan for CollectExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         unimplemented!()
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index d7ee22bbd..8198c4ed2 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -30,7 +30,9 @@ use ballista_core::{
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
 use datafusion::physical_plan::windows::WindowAggExec;
-use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::physical_plan::{
+    with_new_children_if_necessary, ExecutionPlan, Partitioning,
+};
 use futures::future::BoxFuture;
 use futures::FutureExt;
 use log::info;
@@ -99,7 +101,7 @@ impl DistributedPlanner {
                 stages.append(&mut child_stages);
             }
 
-            if let Some(coalesce) = execution_plan
+            if let Some(_coalesce) = execution_plan
                 .as_any()
                 .downcast_ref::<CoalescePartitionsExec>()
             {
@@ -122,7 +124,10 @@ impl DistributedPlanner {
                 ));
                 stages.push(shuffle_writer);
                 Ok((
-                    coalesce.with_new_children(vec![unresolved_shuffle])?,
+                    with_new_children_if_necessary(
+                        execution_plan,
+                        vec![unresolved_shuffle],
+                    )?,
                     stages,
                 ))
             } else if let Some(repart) =
@@ -163,7 +168,10 @@ impl DistributedPlanner {
                     window
                 )))
             } else {
-                Ok((execution_plan.with_new_children(children)?, stages))
+                Ok((
+                    with_new_children_if_necessary(execution_plan, children)?,
+                    stages,
+                ))
             }
         }
         .boxed()
@@ -197,7 +205,7 @@ pub fn find_unresolved_shuffles(
 }
 
 pub fn remove_unresolved_shuffles(
-    stage: &dyn ExecutionPlan,
+    stage: Arc<dyn ExecutionPlan>,
     partition_locations: &HashMap<usize, HashMap<usize, Vec<PartitionLocation>>>,
 ) -> Result<Arc<dyn ExecutionPlan>> {
     let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
@@ -240,13 +248,10 @@ pub fn remove_unresolved_shuffles(
                 unresolved_shuffle.schema().clone(),
             )?))
         } else {
-            new_children.push(remove_unresolved_shuffles(
-                child.as_ref(),
-                partition_locations,
-            )?);
+            new_children.push(remove_unresolved_shuffles(child, partition_locations)?);
         }
     }
-    Ok(stage.with_new_children(new_children)?)
+    Ok(with_new_children_if_necessary(stage, new_children)?)
 }
 
 fn create_shuffle_writer(
diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs
index 2f6220f33..4b47e5239 100644
--- a/ballista/rust/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs
@@ -241,7 +241,7 @@ impl Default for SessionContextRegistry {
 }
 
 impl SessionContextRegistry {
-    /// Create the registry that object stores can registered into.
+    /// Create the registry that session contexts can registered into.
     /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
     pub fn new() -> Self {
         Self {
diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index 4b22ed16a..58e9f8f3b 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -304,8 +304,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
                 }
             }
 
-            let plan =
-                remove_unresolved_shuffles(stage_plan.as_ref(), &partition_locations)?;
+            let plan = remove_unresolved_shuffles(stage_plan, &partition_locations)?;
             self.state.save_stage_plan(job_id, stage_id, plan).await?;
         }
 
diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs
index 3d725c4c6..a4b9fda1a 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -21,7 +21,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::dataframe::DataFrame;
 use datafusion::datasource::TableProvider;
-use datafusion::error::{DataFusionError, Result};
+use datafusion::error::Result;
 use datafusion::execution::context::TaskContext;
 use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
@@ -219,17 +219,10 @@ impl ExecutionPlan for CustomExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 4c9edb452..50f8abe7f 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -19,6 +19,7 @@
 //! in bigger batches to avoid overhead with small batches
 
 use super::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::with_new_children_if_necessary;
 use crate::{
     error::Result,
     physical_plan::{
@@ -69,7 +70,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
             // leaf node, children cannot be replaced
             Ok(plan.clone())
         } else {
-            let plan = plan.with_new_children(children)?;
+            let plan = with_new_children_if_necessary(plan, children)?;
             Ok(if wrap_in_coalesce {
                 // TODO we should add specific configuration settings for coalescing batches and
                 // we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is
diff --git a/datafusion/core/src/physical_optimizer/merge_exec.rs b/datafusion/core/src/physical_optimizer/merge_exec.rs
index f23da15e5..f614673f5 100644
--- a/datafusion/core/src/physical_optimizer/merge_exec.rs
+++ b/datafusion/core/src/physical_optimizer/merge_exec.rs
@@ -19,6 +19,7 @@
 //! with more than one partition, to coalesce them into one partition
 //! when the node needs a single partition
 use super::optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::with_new_children_if_necessary;
 use crate::{
     error::Result,
     physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
@@ -52,9 +53,14 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
                 .map(|child| self.optimize(child.clone(), config))
                 .collect::<Result<Vec<_>>>()?;
             match plan.required_child_distribution() {
-                Distribution::UnspecifiedDistribution => plan.with_new_children(children),
-                Distribution::HashPartitioned(_) => plan.with_new_children(children),
-                Distribution::SinglePartition => plan.with_new_children(
+                Distribution::UnspecifiedDistribution => {
+                    with_new_children_if_necessary(plan, children)
+                }
+                Distribution::HashPartitioned(_) => {
+                    with_new_children_if_necessary(plan, children)
+                }
+                Distribution::SinglePartition => with_new_children_if_necessary(
+                    plan,
                     children
                         .iter()
                         .map(|child| {
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index d98fa4162..2506348fe 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -20,7 +20,9 @@ use std::sync::Arc;
 
 use super::optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::Partitioning::*;
-use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan};
+use crate::physical_plan::{
+    repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
+};
 use crate::{error::Result, execution::context::SessionConfig};
 
 /// Optimizer that introduces repartition to introduce more
@@ -191,7 +193,7 @@ fn optimize_partitions(
                 )
             })
             .collect::<Result<_>>()?;
-        plan.with_new_children(children)?
+        with_new_children_if_necessary(plan, children)?
     };
 
     // decide if we should bother trying to repartition the output of this plan
diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs
index bb1415e18..4aceb776d 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -21,7 +21,7 @@ use super::optimizer::PhysicalOptimizerRule;
 use crate::execution::context::SessionConfig;
 
 use crate::error::Result;
-use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 use std::sync::Arc;
 
 /// Convenience rule for writing optimizers: recursively invoke
@@ -42,6 +42,6 @@ pub fn optimize_children(
     if children.is_empty() {
         Ok(Arc::clone(&plan))
     } else {
-        plan.with_new_children(children)
+        with_new_children_if_necessary(plan, children)
     }
 }
diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs
index 31f47a673..f8050f16c 100644
--- a/datafusion/core/src/physical_plan/analyze.rs
+++ b/datafusion/core/src/physical_plan/analyze.rs
@@ -92,21 +92,14 @@ impl ExecutionPlan for AnalyzeExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         mut children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.len() == 1 {
-            Ok(Arc::new(Self::new(
-                self.verbose,
-                children.pop().unwrap(),
-                self.schema.clone(),
-            )))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Invalid child count for AnalyzeExec. Expected 1 got {}",
-                children.len()
-            )))
-        }
+        Ok(Arc::new(Self::new(
+            self.verbose,
+            children.pop().unwrap(),
+            self.schema.clone(),
+        )))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs
index 18785354c..482b0ee77 100644
--- a/datafusion/core/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/core/src/physical_plan/coalesce_batches.rs
@@ -23,7 +23,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
     SendableRecordBatchStream,
@@ -107,18 +107,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(CoalesceBatchesExec::new(
-                children[0].clone(),
-                self.target_batch_size,
-            ))),
-            _ => Err(DataFusionError::Internal(
-                "CoalesceBatchesExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(CoalesceBatchesExec::new(
+            children[0].clone(),
+            self.target_batch_size,
+        )))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs
index b2832f73c..3ecbd61f2 100644
--- a/datafusion/core/src/physical_plan/coalesce_partitions.rs
+++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs
@@ -96,15 +96,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
-            _ => Err(DataFusionError::Internal(
-                "CoalescePartitionsExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone())))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/cross_join.rs b/datafusion/core/src/physical_plan/cross_join.rs
index efe822421..43555f077 100644
--- a/datafusion/core/src/physical_plan/cross_join.rs
+++ b/datafusion/core/src/physical_plan/cross_join.rs
@@ -32,10 +32,7 @@ use super::{
     coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid,
     ColumnStatistics, Statistics,
 };
-use crate::{
-    error::{DataFusionError, Result},
-    scalar::ScalarValue,
-};
+use crate::{error::Result, scalar::ScalarValue};
 use async_trait::async_trait;
 use std::time::Instant;
 
@@ -120,18 +117,13 @@ impl ExecutionPlan for CrossJoinExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            2 => Ok(Arc::new(CrossJoinExec::try_new(
-                children[0].clone(),
-                children[1].clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "CrossJoinExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(CrossJoinExec::try_new(
+            children[0].clone(),
+            children[1].clone(),
+        )?))
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs
index 0fbf18861..42ed368e1 100644
--- a/datafusion/core/src/physical_plan/empty.rs
+++ b/datafusion/core/src/physical_plan/empty.rs
@@ -104,18 +104,13 @@ impl ExecutionPlan for EmptyExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            0 => Ok(Arc::new(EmptyExec::new(
-                self.produce_one_row,
-                self.schema.clone(),
-            ))),
-            _ => Err(DataFusionError::Internal(
-                "EmptyExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(EmptyExec::new(
+            self.produce_one_row,
+            self.schema.clone(),
+        )))
     }
 
     async fn execute(
@@ -161,6 +156,7 @@ impl ExecutionPlan for EmptyExec {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::physical_plan::with_new_children_if_necessary;
     use crate::prelude::SessionContext;
     use crate::{physical_plan::common, test_util};
 
@@ -184,18 +180,19 @@ mod tests {
     #[test]
     fn with_new_children() -> Result<()> {
         let schema = test_util::aggr_test_schema();
-        let empty = EmptyExec::new(false, schema.clone());
-        let empty_with_row = EmptyExec::new(true, schema);
+        let empty = Arc::new(EmptyExec::new(false, schema.clone()));
+        let empty_with_row = Arc::new(EmptyExec::new(true, schema));
 
-        let empty2 = empty.with_new_children(vec![])?;
+        let empty2 = with_new_children_if_necessary(empty.clone(), vec![])?;
         assert_eq!(empty.schema(), empty2.schema());
 
-        let empty_with_row_2 = empty_with_row.with_new_children(vec![])?;
+        let empty_with_row_2 =
+            with_new_children_if_necessary(empty_with_row.clone(), vec![])?;
         assert_eq!(empty_with_row.schema(), empty_with_row_2.schema());
 
         let too_many_kids = vec![empty2];
         assert!(
-            empty.with_new_children(too_many_kids).is_err(),
+            with_new_children_if_necessary(empty, too_many_kids).is_err(),
             "expected error when providing list of kids"
         );
         Ok(())
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index fd5ff03ca..4905e9e17 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -103,17 +103,10 @@ impl ExecutionPlan for ExplainExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs
index 3f6b85777..68f8f2f90 100644
--- a/datafusion/core/src/physical_plan/file_format/avro.rs
+++ b/datafusion/core/src/physical_plan/file_format/avro.rs
@@ -88,17 +88,10 @@ impl ExecutionPlan for AvroExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     #[cfg(not(feature = "avro"))]
diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs
index 9791b2e00..8aea607ea 100644
--- a/datafusion/core/src/physical_plan/file_format/csv.rs
+++ b/datafusion/core/src/physical_plan/file_format/csv.rs
@@ -106,17 +106,10 @@ impl ExecutionPlan for CsvExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs
index a6e7840e7..ef9d498a0 100644
--- a/datafusion/core/src/physical_plan/file_format/json.rs
+++ b/datafusion/core/src/physical_plan/file_format/json.rs
@@ -84,17 +84,10 @@ impl ExecutionPlan for NdJsonExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()) as Arc<dyn ExecutionPlan>)
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index d6800af9f..a5146360e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -192,17 +192,10 @@ impl ExecutionPlan for ParquetExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(format!(
-                "Children cannot be replaced in {:?}",
-                self
-            )))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs
index 0822aa4ff..ff86d0893 100644
--- a/datafusion/core/src/physical_plan/filter.rs
+++ b/datafusion/core/src/physical_plan/filter.rs
@@ -119,18 +119,13 @@ impl ExecutionPlan for FilterExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(FilterExec::try_new(
-                self.predicate.clone(),
-                children[0].clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "FilterExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(FilterExec::try_new(
+            self.predicate.clone(),
+            children[0].clone(),
+        )?))
     }
 
     async fn execute(
@@ -242,10 +237,10 @@ mod tests {
 
     use super::*;
     use crate::datafusion_data_access::object_store::local::LocalFileSystem;
-    use crate::physical_plan::collect;
     use crate::physical_plan::expressions::*;
     use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
     use crate::physical_plan::ExecutionPlan;
+    use crate::physical_plan::{collect, with_new_children_if_necessary};
     use crate::prelude::SessionContext;
     use crate::scalar::ScalarValue;
     use crate::test;
@@ -307,4 +302,44 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    #[allow(clippy::vtable_address_comparisons)]
+    async fn with_new_children() -> Result<()> {
+        let schema = test_util::aggr_test_schema();
+        let partitions = 4;
+        let (_, files) =
+            test::create_partitioned_csv("aggregate_test_100.csv", partitions)?;
+        let input = Arc::new(CsvExec::new(
+            FileScanConfig {
+                object_store: Arc::new(LocalFileSystem {}),
+                file_schema: Arc::clone(&schema),
+                file_groups: files,
+                statistics: Statistics::default(),
+                projection: None,
+                limit: None,
+                table_partition_cols: vec![],
+            },
+            true,
+            b',',
+        ));
+
+        let predicate: Arc<dyn PhysicalExpr> = binary(
+            col("c2", &schema)?,
+            Operator::Gt,
+            lit(ScalarValue::from(1u32)),
+            &schema,
+        )?;
+
+        let filter: Arc<dyn ExecutionPlan> =
+            Arc::new(FilterExec::try_new(predicate, input.clone())?);
+
+        let new_filter = filter.clone().with_new_children(vec![input.clone()])?;
+        assert!(!Arc::ptr_eq(&filter, &new_filter));
+
+        let new_filter2 = with_new_children_if_necessary(filter.clone(), vec![input])?;
+        assert!(Arc::ptr_eq(&filter, &new_filter2));
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_plan/hash_aggregate.rs b/datafusion/core/src/physical_plan/hash_aggregate.rs
index 7476f77ed..75d68a6bc 100644
--- a/datafusion/core/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/core/src/physical_plan/hash_aggregate.rs
@@ -28,7 +28,7 @@ use futures::{
     Future,
 };
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::{
     Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
@@ -261,21 +261,16 @@ impl ExecutionPlan for HashAggregateExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(HashAggregateExec::try_new(
-                self.mode,
-                self.group_expr.clone(),
-                self.aggr_expr.clone(),
-                children[0].clone(),
-                self.input_schema.clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "HashAggregateExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(HashAggregateExec::try_new(
+            self.mode,
+            self.group_expr.clone(),
+            self.aggr_expr.clone(),
+            children[0].clone(),
+            self.input_schema.clone(),
+        )?))
     }
 
     fn metrics(&self) -> Option<MetricsSet> {
@@ -1027,6 +1022,7 @@ mod tests {
     use crate::{assert_batches_sorted_eq, physical_plan::common};
     use arrow::array::{Float64Array, UInt32Array};
     use arrow::datatypes::DataType;
+    use datafusion_common::DataFusionError;
     use futures::FutureExt;
 
     use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -1178,7 +1174,7 @@ mod tests {
         }
 
         fn with_new_children(
-            &self,
+            self: Arc<Self>,
             _: Vec<Arc<dyn ExecutionPlan>>,
         ) -> Result<Arc<dyn ExecutionPlan>> {
             Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs
index ec7e032f4..44b8cc97d 100644
--- a/datafusion/core/src/physical_plan/hash_join.rs
+++ b/datafusion/core/src/physical_plan/hash_join.rs
@@ -257,22 +257,17 @@ impl ExecutionPlan for HashJoinExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            2 => Ok(Arc::new(HashJoinExec::try_new(
-                children[0].clone(),
-                children[1].clone(),
-                self.on.clone(),
-                &self.join_type,
-                self.mode,
-                &self.null_equals_null,
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "HashJoinExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(HashJoinExec::try_new(
+            children[0].clone(),
+            children[1].clone(),
+            self.on.clone(),
+            &self.join_type,
+            self.mode,
+            &self.null_equals_null,
+        )?))
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs
index 5aa2ab6ff..1cf99b3f5 100644
--- a/datafusion/core/src/physical_plan/limit.rs
+++ b/datafusion/core/src/physical_plan/limit.rs
@@ -117,18 +117,13 @@ impl ExecutionPlan for GlobalLimitExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(GlobalLimitExec::new(
-                children[0].clone(),
-                self.limit,
-            ))),
-            _ => Err(DataFusionError::Internal(
-                "GlobalLimitExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(GlobalLimitExec::new(
+            children[0].clone(),
+            self.limit,
+        )))
     }
 
     async fn execute(
@@ -268,7 +263,7 @@ impl ExecutionPlan for LocalLimitExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         match children.len() {
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index 2662c551b..2862aefdb 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -27,13 +27,14 @@ use super::{
     common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning,
     RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use arrow::datatypes::SchemaRef;
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
 
 use crate::execution::context::TaskContext;
 use async_trait::async_trait;
+use datafusion_common::DataFusionError;
 use futures::Stream;
 
 /// Execution plan for reading in-memory batches of data
@@ -87,7 +88,7 @@ impl ExecutionPlan for MemoryExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index f14ba3fdc..9d45ece61 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -37,6 +37,7 @@ use futures::stream::Stream;
 use std::fmt;
 use std::fmt::Debug;
 
+use datafusion_common::DataFusionError;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::{any::Any, pin::Pin};
@@ -216,9 +217,8 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
 
     /// Returns a new plan where all children were replaced by new plans.
-    /// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
 
@@ -258,6 +258,31 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     fn statistics(&self) -> Statistics;
 }
 
+/// Returns a copy of this plan if we change any child according to the pointer comparison.
+/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
+/// Allow the vtable address comparisons for ExecutionPlan Trait Objects,it is harmless even
+/// in the case of 'false-native'.
+#[allow(clippy::vtable_address_comparisons)]
+pub fn with_new_children_if_necessary(
+    plan: Arc<dyn ExecutionPlan>,
+    children: Vec<Arc<dyn ExecutionPlan>>,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    if children.len() != plan.children().len() {
+        Err(DataFusionError::Internal(
+            "Wrong number of children".to_string(),
+        ))
+    } else if children.is_empty()
+        || children
+            .iter()
+            .zip(plan.children().iter())
+            .any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
+    {
+        plan.with_new_children(children)
+    } else {
+        Ok(plan)
+    }
+}
+
 /// Return a [wrapper](DisplayableExecutionPlan) around an
 /// [`ExecutionPlan`] which can be displayed in various easier to
 /// understand ways.
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index d6d40a740..1e6c13f9c 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1927,7 +1927,7 @@ mod tests {
         }
 
         fn with_new_children(
-            &self,
+            self: Arc<Self>,
             _children: Vec<Arc<dyn ExecutionPlan>>,
         ) -> Result<Arc<dyn ExecutionPlan>> {
             unimplemented!("NoOpExecutionPlan::with_new_children");
diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs
index 4419b6f91..a49d00669 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -26,7 +26,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr,
 };
@@ -136,18 +136,13 @@ impl ExecutionPlan for ProjectionExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(ProjectionExec::try_new(
-                self.expr.clone(),
-                children[0].clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "ProjectionExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(ProjectionExec::try_new(
+            self.expr.clone(),
+            children[0].clone(),
+        )?))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs
index 18cfb4cd3..18e87e09d 100644
--- a/datafusion/core/src/physical_plan/repartition.rs
+++ b/datafusion/core/src/physical_plan/repartition.rs
@@ -152,18 +152,13 @@ impl ExecutionPlan for RepartitionExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(RepartitionExec::try_new(
-                children[0].clone(),
-                self.partitioning.clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "RepartitionExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(RepartitionExec::try_new(
+            children[0].clone(),
+            self.partitioning.clone(),
+        )?))
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs
index f48beb34b..163a608b3 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -716,18 +716,13 @@ impl ExecutionPlan for SortExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(SortExec::try_new(
-                self.expr.clone(),
-                children[0].clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "SortExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(SortExec::try_new(
+            self.expr.clone(),
+            children[0].clone(),
+        )?))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 3fb480de2..4b0fccd26 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -141,18 +141,13 @@ impl ExecutionPlan for SortPreservingMergeExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(SortPreservingMergeExec::new(
-                self.expr.clone(),
-                children[0].clone(),
-            ))),
-            _ => Err(DataFusionError::Internal(
-                "SortPreservingMergeExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(SortPreservingMergeExec::new(
+            self.expr.clone(),
+            children[0].clone(),
+        )))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs
index f156a60fe..8c730ea0e 100644
--- a/datafusion/core/src/physical_plan/union.rs
+++ b/datafusion/core/src/physical_plan/union.rs
@@ -100,7 +100,7 @@ impl ExecutionPlan for UnionExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Ok(Arc::new(UnionExec::new(children)))
diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs
index 8cc448df5..f2ba681ed 100644
--- a/datafusion/core/src/physical_plan/values.rs
+++ b/datafusion/core/src/physical_plan/values.rs
@@ -129,18 +129,13 @@ impl ExecutionPlan for ValuesExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            0 => Ok(Arc::new(ValuesExec {
-                schema: self.schema.clone(),
-                data: self.data.clone(),
-            })),
-            _ => Err(DataFusionError::Internal(
-                "ValuesExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(ValuesExec {
+            schema: self.schema.clone(),
+            data: self.data.clone(),
+        }))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index f59bc910b..553b6f26b 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -17,7 +17,7 @@
 
 //! Stream and channel implementations for window function expressions.
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::execution::context::TaskContext;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::expressions::PhysicalSortExpr;
@@ -140,19 +140,14 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(WindowAggExec::try_new(
-                self.window_expr.clone(),
-                children[0].clone(),
-                self.input_schema.clone(),
-            )?)),
-            _ => Err(DataFusionError::Internal(
-                "WindowAggExec wrong number of children".to_owned(),
-            )),
-        }
+        Ok(Arc::new(WindowAggExec::try_new(
+            self.window_expr.clone(),
+            children[0].clone(),
+            self.input_schema.clone(),
+        )?))
     }
 
     async fn execute(
diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs
index 41d3c55ea..7e0cbe35f 100644
--- a/datafusion/core/src/test/exec.rs
+++ b/datafusion/core/src/test/exec.rs
@@ -161,8 +161,8 @@ impl ExecutionPlan for MockExec {
     }
 
     fn with_new_children(
-        &self,
-        _children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         unimplemented!()
     }
@@ -300,8 +300,8 @@ impl ExecutionPlan for BarrierExec {
     }
 
     fn with_new_children(
-        &self,
-        _children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         unimplemented!()
     }
@@ -401,8 +401,8 @@ impl ExecutionPlan for ErrorExec {
     }
 
     fn with_new_children(
-        &self,
-        _children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         unimplemented!()
     }
@@ -481,16 +481,10 @@ impl ExecutionPlan for StatisticsExec {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
+        Ok(self)
     }
 
     async fn execute(
@@ -582,7 +576,7 @@ impl ExecutionPlan for BlockingExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         Err(DataFusionError::Internal(format!(
diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs
index 957bcf504..0916e966c 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -25,10 +25,7 @@ use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::scalar::ScalarValue;
 use datafusion::{datasource::TableProvider, physical_plan::collect};
-use datafusion::{
-    error::{DataFusionError, Result},
-    physical_plan::DisplayFormatType,
-};
+use datafusion::{error::Result, physical_plan::DisplayFormatType};
 
 use datafusion::execution::context::{SessionContext, TaskContext};
 use datafusion::logical_plan::{
@@ -120,16 +117,10 @@ impl ExecutionPlan for CustomExecutionPlan {
         vec![]
     }
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
+        Ok(self)
     }
     async fn execute(
         &self,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index e722d4cee..cfd903991 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -78,7 +78,7 @@ impl ExecutionPlan for CustomPlan {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         unreachable!()
diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs
index 9f5a1aab0..1170dc7cb 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -22,7 +22,7 @@ use std::{any::Any, sync::Arc};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::{
     datasource::TableProvider,
-    error::{DataFusionError, Result},
+    error::Result,
     logical_plan::Expr,
     physical_plan::{
         expressions::PhysicalSortExpr, project_schema, ColumnStatistics,
@@ -129,16 +129,10 @@ impl ExecutionPlan for StatisticsValidation {
     }
 
     fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
+        self: Arc<Self>,
+        _: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
+        Ok(self)
     }
 
     async fn execute(
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 86da4da85..43e6eeacd 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -447,18 +447,13 @@ impl ExecutionPlan for TopKExec {
     }
 
     fn with_new_children(
-        &self,
+        self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        match children.len() {
-            1 => Ok(Arc::new(TopKExec {
-                input: children[0].clone(),
-                k: self.k,
-            })),
-            _ => Err(DataFusionError::Internal(
-                "TopKExec wrong number of children".to_string(),
-            )),
-        }
+        Ok(Arc::new(TopKExec {
+            input: children[0].clone(),
+            k: self.k,
+        }))
     }
 
     /// Execute one partition and return an iterator over RecordBatch