You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/22 14:22:42 UTC
[arrow-datafusion] branch master updated: Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7c07e4d77 Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
7c07e4d77 is described below
commit 7c07e4d77aa2da60a83cc3558643eeac01fa98ce
Author: mingmwang <mi...@ebay.com>
AuthorDate: Tue Nov 22 22:22:36 2022 +0800
Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable (#4316)
* Add a checker to confirm physical optimizer rules will keep the physical plan schema immutable
* fix debug print
* fix comments
---
datafusion/core/src/physical_optimizer/aggregate_statistics.rs | 8 ++++++--
datafusion/core/src/physical_optimizer/coalesce_batches.rs | 4 ++++
datafusion/core/src/physical_optimizer/enforcement.rs | 4 ++++
datafusion/core/src/physical_optimizer/hash_build_probe_order.rs | 4 ++++
datafusion/core/src/physical_optimizer/optimizer.rs | 6 ++++++
datafusion/core/src/physical_optimizer/repartition.rs | 4 ++++
datafusion/core/src/physical_plan/planner.rs | 9 +++++++++
7 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index cf509bd8f..ff4c2190f 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -18,7 +18,6 @@
//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;
-use arrow::datatypes::Schema;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use crate::execution::context::SessionConfig;
@@ -85,7 +84,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
// input can be entirely removed
Ok(Arc::new(ProjectionExec::try_new(
projections,
- Arc::new(EmptyExec::new(true, Arc::new(Schema::empty()))),
+ Arc::new(EmptyExec::new(true, plan.schema())),
)?))
} else {
optimize_children(self, plan, config)
@@ -98,6 +97,11 @@ impl PhysicalOptimizerRule for AggregateStatistics {
fn name(&self) -> &str {
"aggregate_statistics"
}
+
+ /// This rule will change the nullable properties of the schema, disable the schema check.
+ fn schema_check(&self) -> bool {
+ false
+ }
}
/// assert if the node passed as argument is a final `AggregateExec` node that can be optimized:
diff --git a/datafusion/core/src/physical_optimizer/coalesce_batches.rs b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
index 0d4085478..df7f9e552 100644
--- a/datafusion/core/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/core/src/physical_optimizer/coalesce_batches.rs
@@ -72,4 +72,8 @@ impl PhysicalOptimizerRule for CoalesceBatches {
fn name(&self) -> &str {
"coalesce_batches"
}
+
+ fn schema_check(&self) -> bool {
+ true
+ }
}
diff --git a/datafusion/core/src/physical_optimizer/enforcement.rs b/datafusion/core/src/physical_optimizer/enforcement.rs
index 13f6d0bfa..b27a4af06 100644
--- a/datafusion/core/src/physical_optimizer/enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/enforcement.rs
@@ -101,6 +101,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
fn name(&self) -> &str {
"BasicEnforcement"
}
+
+ fn schema_check(&self) -> bool {
+ true
+ }
}
/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
diff --git a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
index 3a7e0dad2..7c6513499 100644
--- a/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
+++ b/datafusion/core/src/physical_optimizer/hash_build_probe_order.rs
@@ -211,6 +211,10 @@ impl PhysicalOptimizerRule for HashBuildProbeOrder {
fn name(&self) -> &str {
"hash_build_probe_order"
}
+
+ fn schema_check(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs
index 741fb4876..18cbc139d 100644
--- a/datafusion/core/src/physical_optimizer/optimizer.rs
+++ b/datafusion/core/src/physical_optimizer/optimizer.rs
@@ -36,4 +36,10 @@ pub trait PhysicalOptimizerRule {
/// A human readable name for this optimizer rule
fn name(&self) -> &str;
+
+ /// A flag to indicate whether the physical planner should valid the rule will not
+ /// change the schema of the plan after the rewriting.
+ /// Some of the optimization rules might change the nullable properties of the schema
+ /// and should disable the schema check.
+ fn schema_check(&self) -> bool;
}
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index f88ef4ab0..e22da6e22 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -233,6 +233,10 @@ impl PhysicalOptimizerRule for Repartition {
fn name(&self) -> &str {
"repartition"
}
+
+ fn schema_check(&self) -> bool {
+ true
+ }
}
#[cfg(test)]
mod tests {
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index b30b8ca82..7345f44d6 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1679,7 +1679,16 @@ impl DefaultPhysicalPlanner {
let mut new_plan = plan;
for optimizer in optimizers {
+ let before_schema = new_plan.schema();
new_plan = optimizer.optimize(new_plan, &session_state.config)?;
+ if optimizer.schema_check() && new_plan.schema() != before_schema {
+ return Err(DataFusionError::Internal(format!(
+ "PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
+ optimizer.name(),
+ before_schema,
+ new_plan.schema()
+ )));
+ }
observer(new_plan.as_ref(), optimizer.as_ref())
}
debug!(