You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 14:22:42 UTC

[arrow-datafusion] branch master updated: Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c07e4d77 Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
7c07e4d77 is described below

commit 7c07e4d77aa2da60a83cc3558643eeac01fa98ce
Author: mingmwang <mi...@ebay.com>
AuthorDate: Tue Nov 22 22:22:36 2022 +0800

    Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
    
    * Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable
    
    * fix debug print
    
    * fix comments
---
 datafusion/core/src/physical_optimizer/aggregate_statistics.rs   | 8 ++++++--
 datafusion/core/src/physical_optimizer/coalesce_batches.rs       | 4 ++++
 datafusion/core/src/physical_optimizer/enforcement.rs            | 4 ++++
 datafusion/core/src/physical_optimizer/hash_build_probe_order.rs | 4 ++++
 datafusion/core/src/physical_optimizer/optimizer.rs              | 6 ++++++
 datafusion/core/src/physical_optimizer/repartition.rs            | 4 ++++
 datafusion/core/src/physical_plan/planner.rs                     | 9 +++++++++
 7 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index cf509bd8f..ff4c2190f 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -18,7 +18,6 @@
 //! Utilizing exact statistics from sources to avoid scanning data
 use std::sync::Arc;
 
-use arrow::datatypes::Schema;
 use datafusion_expr::utils::COUNT_STAR_EXPANSION;
 
 use crate::execution::context::SessionConfig;
@@ -85,7 +84,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
                 // input can be entirely removed
                 Ok(Arc::new(ProjectionExec::try_new(
                     projections,
-                    Arc::new(EmptyExec::new(true, Arc::new(Schema::empty()))),
+                    Arc::new(EmptyExec::new(true, plan.schema())),
                 )?))
             } else {
                 optimize_children(self, plan, config)
@@ -98,6 +97,11 @@ impl PhysicalOptimizerRule for AggregateStatistics {
     fn name(&self) -> &str {
         "aggregate_statistics"
     }
+
+    /// This rule will change the nullable properties of the schema, disable the schema check.
+    fn schema_check(&self) -> bool {
+        false
+    }
 }
 
 /// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 0d4085478..df7f9e552 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -72,4 +72,8 @@ impl PhysicalOptimizerRule for CoalesceBatches {
     fn name(&self) -> &str {
         "coalesce_batches"
     }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 13f6d0bfa..b27a4af06 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -101,6 +101,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
     fn name(&self) -> &str {
         "BasicEnforcement"
     }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
 }
 
 /// When the physical planner creates the Joins, the ordering of join keys is from the original query.
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 3a7e0dad2..7c6513499 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -211,6 +211,10 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
     fn name(&self) -> &str {
         "hash_build_probe_order"
     }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 741fb4876..18cbc139d 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -36,4 +36,10 @@ pub trait PhysicalOptimizerRule {
 
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
+
+    /// A flag to indicate whether the physical planner should valid the rule will not
+    /// change the schema of the plan after the rewriting.
+    /// Some of the optimization rules might change the nullable properties of the schema
+    /// and should disable the schema check.
+    fn schema_check(&self) -> bool;
 }
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index f88ef4ab0..e22da6e22 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -233,6 +233,10 @@ impl PhysicalOptimizerRule for Repartition {
     fn name(&self) -> &str {
         "repartition"
     }
+
+    fn schema_check(&self) -> bool {
+        true
+    }
 }
 #[cfg(test)]
 mod tests {
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index b30b8ca82..7345f44d6 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1679,7 +1679,16 @@ impl DefaultPhysicalPlanner {
 
         let mut new_plan = plan;
         for optimizer in optimizers {
+            let before_schema = new_plan.schema();
             new_plan = optimizer.optimize(new_plan, &session_state.config)?;
+            if optimizer.schema_check() && new_plan.schema() != before_schema {
+                return Err(DataFusionError::Internal(format!(
+                        "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
+                        optimizer.name(),
+                        before_schema,
+                        new_plan.schema()
+                    )));
+            }
             observer(new_plan.as_ref(), optimizer.as_ref())
         }
         debug!(