You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/26 11:15:34 UTC

[arrow-datafusion] branch master updated: Do not error in optimizer if resulting schema has different metadata (#4347)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a61615b29 Do not error in optimizer if resulting schema has different metadata (#4347)
a61615b29 is described below

commit a61615b2949bea9027eefe686613605e135780f2
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Nov 26 06:15:29 2022 -0500

    Do not error in optimizer if resulting schema has different metadata (#4347)
---
 datafusion/common/src/dfschema.rs     | 143 +++++++++++++++++++++++++++++++++-
 datafusion/optimizer/src/optimizer.rs |  55 ++++++++++++-
 2 files changed, 192 insertions(+), 6 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs
index b74012e4c..1110423db 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -338,6 +338,24 @@ impl DFSchema {
             })
     }
 
+    /// Returns true if the two schemas have the same qualified named
+    /// fields with the same data types. Returns false otherwise.
+    ///
+    /// This is a specialized version of Eq that ignores differences
+    /// in nullability and metadata.
+    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
+        if self.fields().len() != other.fields().len() {
+            return false;
+        }
+        let self_fields = self.fields().iter();
+        let other_fields = other.fields().iter();
+        self_fields.zip(other_fields).all(|(f1, f2)| {
+            f1.qualifier() == f2.qualifier()
+                && f1.data_type() == f2.data_type()
+                && f1.name() == f2.name()
+        })
+    }
+
     /// Strip all field qualifier in schema
     pub fn strip_qualifiers(self) -> Self {
         DFSchema {
@@ -773,6 +791,114 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn equivalent_names_and_types() {
+        let field1_i16_t = DFField::from(Field::new("f1", DataType::Int16, true));
+        let field1_i16_t_meta = DFField::from(
+            field1_i16_t
+                .field()
+                .clone()
+                .with_metadata(Some(test_bmetadata_n(2))),
+        );
+        let field1_i16_t_qualified =
+            DFField::from_qualified("foo", field1_i16_t.field().clone());
+        let field1_i16_f = DFField::from(Field::new("f1", DataType::Int16, false));
+        let field1_i32_t = DFField::from(Field::new("f1", DataType::Int32, true));
+        let field2_i16_t = DFField::from(Field::new("f2", DataType::Int16, true));
+        let field3_i16_t = DFField::from(Field::new("f3", DataType::Int16, true));
+
+        // same
+        TestCase {
+            fields1: vec![&field1_i16_t],
+            fields2: vec![&field1_i16_t],
+            expected: true,
+        }
+        .run();
+
+        // same but metadata is different, should still be true
+        TestCase {
+            fields1: vec![&field1_i16_t_meta],
+            fields2: vec![&field1_i16_t],
+            expected: true,
+        }
+        .run();
+
+        // different name
+        TestCase {
+            fields1: vec![&field1_i16_t],
+            fields2: vec![&field2_i16_t],
+            expected: false,
+        }
+        .run();
+
+        // different type
+        TestCase {
+            fields1: vec![&field1_i16_t],
+            fields2: vec![&field1_i32_t],
+            expected: false,
+        }
+        .run();
+
+        // different nullability
+        TestCase {
+            fields1: vec![&field1_i16_t],
+            fields2: vec![&field1_i16_f],
+            expected: true,
+        }
+        .run();
+
+        // different qualifier
+        TestCase {
+            fields1: vec![&field1_i16_t],
+            fields2: vec![&field1_i16_t_qualified],
+            expected: false,
+        }
+        .run();
+
+        // different name after first
+        TestCase {
+            fields1: vec![&field2_i16_t, &field1_i16_t],
+            fields2: vec![&field2_i16_t, &field3_i16_t],
+            expected: false,
+        }
+        .run();
+
+        // different number
+        TestCase {
+            fields1: vec![&field1_i16_t, &field2_i16_t],
+            fields2: vec![&field1_i16_t],
+            expected: false,
+        }
+        .run();
+
+        #[derive(Debug)]
+        struct TestCase<'a> {
+            fields1: Vec<&'a DFField>,
+            fields2: Vec<&'a DFField>,
+            expected: bool,
+        }
+
+        impl<'a> TestCase<'a> {
+            fn run(self) {
+                println!("Running {:#?}", self);
+                let schema1 = to_df_schema(self.fields1);
+                let schema2 = to_df_schema(self.fields2);
+                assert_eq!(
+                    schema1.equivalent_names_and_types(&schema2),
+                    self.expected,
+                    "schema1:\n\n{:#?}\n\nschema2:\n\n{:#?}",
+                    schema1,
+                    schema2
+                );
+            }
+        }
+
+        fn to_df_schema(fields: Vec<&DFField>) -> DFSchema {
+            let fields = fields.into_iter().cloned().collect();
+            DFSchema::new_with_metadata(fields, HashMap::new()).unwrap()
+        }
+    }
+
     #[test]
     fn into() {
         // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
@@ -845,9 +971,20 @@ mod tests {
     }
 
     fn test_metadata() -> HashMap<String, String> {
-        vec![("k1", "v1"), ("k2", "v2")]
+        test_metadata_n(2)
+    }
+
+    fn test_metadata_n(n: usize) -> HashMap<String, String> {
+        (0..n)
             .into_iter()
-            .map(|(k, v)| (k.to_string(), v.to_string()))
-            .collect::<HashMap<_, _>>()
+            .map(|i| (format!("k{}", i), format!("v{}", i)))
+            .collect()
+    }
+
+    fn test_bmetadata_n(n: usize) -> BTreeMap<String, String> {
+        (0..n)
+            .into_iter()
+            .map(|i| (format!("k{}", i), format!("v{}", i)))
+            .collect()
     }
 }
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 1dfd10346..fd70c805c 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -225,7 +225,7 @@ impl Optimizer {
                 let result = rule.try_optimize(&new_plan, optimizer_config);
                 match result {
                     Ok(Some(plan)) => {
-                        if plan.schema() != new_plan.schema() {
+                        if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
                             return Err(DataFusionError::Internal(format!(
                                 "Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
                                 rule.name(),
@@ -292,9 +292,10 @@ mod tests {
     use crate::optimizer::Optimizer;
     use crate::test::test_table_scan;
     use crate::{OptimizerConfig, OptimizerRule};
-    use datafusion_common::{DFSchema, DataFusionError};
+    use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError};
     use datafusion_expr::logical_plan::EmptyRelation;
-    use datafusion_expr::{LogicalPlan, LogicalPlanBuilder};
+    use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
+    use std::collections::BTreeMap;
     use std::sync::Arc;
 
     #[test]
@@ -351,6 +352,53 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn generate_same_schema_different_metadata() {
+        // if the plan creates more metadata than previously (because
+        // some wrapping functions are removed, etc) do not error
+        let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
+        let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+
+        let input = Arc::new(test_table_scan().unwrap());
+        let input_schema = input.schema().clone();
+
+        let plan = LogicalPlan::Projection(Projection {
+            expr: vec![col("a"), col("b"), col("c")],
+            input,
+            schema: add_metadata_to_fields(input_schema.as_ref()),
+            alias: None,
+        });
+
+        // optimizing should be ok, but the schema will have changed  (no metadata)
+        assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
+        let optimized_plan = opt.optimize(&plan, &mut config, &observe).unwrap();
+        // metadata was removed
+        assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
+    }
+
+    fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
+        let new_fields = schema
+            .fields()
+            .iter()
+            .enumerate()
+            .map(|(i, f)| {
+                let metadata: BTreeMap<_, _> = [("key".into(), format!("value {}", i))]
+                    .into_iter()
+                    .collect();
+
+                let new_arrow_field = f.field().clone().with_metadata(Some(metadata));
+                if let Some(qualifier) = f.qualifier() {
+                    DFField::from_qualified(qualifier, new_arrow_field)
+                } else {
+                    DFField::from(new_arrow_field)
+                }
+            })
+            .collect::<Vec<_>>();
+
+        let new_metadata = schema.metadata().clone();
+        Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
+    }
+
     fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
 
     struct BadRule {}
@@ -369,6 +417,7 @@ mod tests {
         }
     }
 
+    /// Replaces whatever plan with a single table scan
     struct GetTableScanRule {}
 
     impl OptimizerRule for GetTableScanRule {