You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/26 11:15:34 UTC
[arrow-datafusion] branch master updated: Do not error in optimizer if resulting schema has different metadata (#4347)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a61615b29 Do not error in optimizer if resulting schema has different metadata (#4347)
a61615b29 is described below
commit a61615b2949bea9027eefe686613605e135780f2
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Nov 26 06:15:29 2022 -0500
Do not error in optimizer if resulting schema has different metadata (#4347)
---
datafusion/common/src/dfschema.rs | 143 +++++++++++++++++++++++++++++++++-
datafusion/optimizer/src/optimizer.rs | 55 ++++++++++++-
2 files changed, 192 insertions(+), 6 deletions(-)
diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs
index b74012e4c..1110423db 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -338,6 +338,24 @@ impl DFSchema {
})
}
+ /// Returns true if the two schemas have the same qualified named
+ /// fields with the same data types. Returns false otherwise.
+ ///
+ /// This is a specialized version of Eq that ignores differences
+ /// in nullability and metadata.
+ pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
+ if self.fields().len() != other.fields().len() {
+ return false;
+ }
+ let self_fields = self.fields().iter();
+ let other_fields = other.fields().iter();
+ self_fields.zip(other_fields).all(|(f1, f2)| {
+ f1.qualifier() == f2.qualifier()
+ && f1.data_type() == f2.data_type()
+ && f1.name() == f2.name()
+ })
+ }
+
/// Strip all field qualifier in schema
pub fn strip_qualifiers(self) -> Self {
DFSchema {
@@ -773,6 +791,114 @@ mod tests {
Ok(())
}
+ #[test]
+ fn equivalent_names_and_types() {
+ let field1_i16_t = DFField::from(Field::new("f1", DataType::Int16, true));
+ let field1_i16_t_meta = DFField::from(
+ field1_i16_t
+ .field()
+ .clone()
+ .with_metadata(Some(test_bmetadata_n(2))),
+ );
+ let field1_i16_t_qualified =
+ DFField::from_qualified("foo", field1_i16_t.field().clone());
+ let field1_i16_f = DFField::from(Field::new("f1", DataType::Int16, false));
+ let field1_i32_t = DFField::from(Field::new("f1", DataType::Int32, true));
+ let field2_i16_t = DFField::from(Field::new("f2", DataType::Int16, true));
+ let field3_i16_t = DFField::from(Field::new("f3", DataType::Int16, true));
+
+ // same
+ TestCase {
+ fields1: vec![&field1_i16_t],
+ fields2: vec![&field1_i16_t],
+ expected: true,
+ }
+ .run();
+
+ // same but metadata is different, should still be true
+ TestCase {
+ fields1: vec![&field1_i16_t_meta],
+ fields2: vec![&field1_i16_t],
+ expected: true,
+ }
+ .run();
+
+ // different name
+ TestCase {
+ fields1: vec![&field1_i16_t],
+ fields2: vec![&field2_i16_t],
+ expected: false,
+ }
+ .run();
+
+ // different type
+ TestCase {
+ fields1: vec![&field1_i16_t],
+ fields2: vec![&field1_i32_t],
+ expected: false,
+ }
+ .run();
+
+ // different nullability
+ TestCase {
+ fields1: vec![&field1_i16_t],
+ fields2: vec![&field1_i16_f],
+ expected: true,
+ }
+ .run();
+
+ // different qualifier
+ TestCase {
+ fields1: vec![&field1_i16_t],
+ fields2: vec![&field1_i16_t_qualified],
+ expected: false,
+ }
+ .run();
+
+ // different name after first
+ TestCase {
+ fields1: vec![&field2_i16_t, &field1_i16_t],
+ fields2: vec![&field2_i16_t, &field3_i16_t],
+ expected: false,
+ }
+ .run();
+
+ // different number
+ TestCase {
+ fields1: vec![&field1_i16_t, &field2_i16_t],
+ fields2: vec![&field1_i16_t],
+ expected: false,
+ }
+ .run();
+
+ #[derive(Debug)]
+ struct TestCase<'a> {
+ fields1: Vec<&'a DFField>,
+ fields2: Vec<&'a DFField>,
+ expected: bool,
+ }
+
+ impl<'a> TestCase<'a> {
+ fn run(self) {
+ println!("Running {:#?}", self);
+ let schema1 = to_df_schema(self.fields1);
+ let schema2 = to_df_schema(self.fields2);
+ assert_eq!(
+ schema1.equivalent_names_and_types(&schema2),
+ self.expected,
+ "schema1:\n\n{:#?}\n\nschema2:\n\n{:#?}",
+ schema1,
+ schema2
+ );
+ }
+ }
+
+ fn to_df_schema(fields: Vec<&DFField>) -> DFSchema {
+ let fields = fields.into_iter().cloned().collect();
+ DFSchema::new_with_metadata(fields, HashMap::new()).unwrap()
+ }
+ }
+
#[test]
fn into() {
// Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
@@ -845,9 +971,20 @@ mod tests {
}
fn test_metadata() -> HashMap<String, String> {
- vec![("k1", "v1"), ("k2", "v2")]
+ test_metadata_n(2)
+ }
+
+ fn test_metadata_n(n: usize) -> HashMap<String, String> {
+ (0..n)
.into_iter()
- .map(|(k, v)| (k.to_string(), v.to_string()))
- .collect::<HashMap<_, _>>()
+ .map(|i| (format!("k{}", i), format!("v{}", i)))
+ .collect()
+ }
+
+ fn test_bmetadata_n(n: usize) -> BTreeMap<String, String> {
+ (0..n)
+ .into_iter()
+ .map(|i| (format!("k{}", i), format!("v{}", i)))
+ .collect()
}
}
diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 1dfd10346..fd70c805c 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -225,7 +225,7 @@ impl Optimizer {
let result = rule.try_optimize(&new_plan, optimizer_config);
match result {
Ok(Some(plan)) => {
- if plan.schema() != new_plan.schema() {
+ if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
return Err(DataFusionError::Internal(format!(
"Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule.name(),
@@ -292,9 +292,10 @@ mod tests {
use crate::optimizer::Optimizer;
use crate::test::test_table_scan;
use crate::{OptimizerConfig, OptimizerRule};
- use datafusion_common::{DFSchema, DataFusionError};
+ use datafusion_common::{DFField, DFSchema, DFSchemaRef, DataFusionError};
use datafusion_expr::logical_plan::EmptyRelation;
- use datafusion_expr::{LogicalPlan, LogicalPlanBuilder};
+ use datafusion_expr::{col, LogicalPlan, LogicalPlanBuilder, Projection};
+ use std::collections::BTreeMap;
use std::sync::Arc;
#[test]
@@ -351,6 +352,53 @@ mod tests {
Ok(())
}
+ #[test]
+ fn generate_same_schema_different_metadata() {
+ // if the plan creates more metadata than previously (because
+ // some wrapping functions are removed, etc) do not error
+ let opt = Optimizer::with_rules(vec![Arc::new(GetTableScanRule {})]);
+ let mut config = OptimizerConfig::new().with_skip_failing_rules(false);
+
+ let input = Arc::new(test_table_scan().unwrap());
+ let input_schema = input.schema().clone();
+
+ let plan = LogicalPlan::Projection(Projection {
+ expr: vec![col("a"), col("b"), col("c")],
+ input,
+ schema: add_metadata_to_fields(input_schema.as_ref()),
+ alias: None,
+ });
+
+ // optimizing should be ok, but the schema will have changed (no metadata)
+ assert_ne!(plan.schema().as_ref(), input_schema.as_ref());
+ let optimized_plan = opt.optimize(&plan, &mut config, &observe).unwrap();
+ // metadata was removed
+ assert_eq!(optimized_plan.schema().as_ref(), input_schema.as_ref());
+ }
+
+ fn add_metadata_to_fields(schema: &DFSchema) -> DFSchemaRef {
+ let new_fields = schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, f)| {
+ let metadata: BTreeMap<_, _> = [("key".into(), format!("value {}", i))]
+ .into_iter()
+ .collect();
+
+ let new_arrow_field = f.field().clone().with_metadata(Some(metadata));
+ if let Some(qualifier) = f.qualifier() {
+ DFField::from_qualified(qualifier, new_arrow_field)
+ } else {
+ DFField::from(new_arrow_field)
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let new_metadata = schema.metadata().clone();
+ Arc::new(DFSchema::new_with_metadata(new_fields, new_metadata).unwrap())
+ }
+
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
struct BadRule {}
@@ -369,6 +417,7 @@ mod tests {
}
}
+ /// Replaces whatever plan with a single table scan
struct GetTableScanRule {}
impl OptimizerRule for GetTableScanRule {