You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/20 19:40:58 UTC

[avro] branch branch-1.11 updated: AVRO-3461: Resolve flow namespace resolution work (#1609)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new e9faaaa  AVRO-3461: Resolve flow namespace resolution work (#1609)
e9faaaa is described below

commit e9faaaab775606d6e8478f89b5c283c403a0c5d6
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Sun Mar 20 14:40:26 2022 -0500

    AVRO-3461: Resolve flow namespace resolution work (#1609)
    
    * remove clones
    
    * [AVRO-3461] added in fixes and tests
    
    (cherry picked from commit c7fdf69d811b57e888f31ffa520ebeb2f055f55b)
---
 lang/rust/avro/src/schema.rs |   4 +-
 lang/rust/avro/src/types.rs  | 434 ++++++++++++++++++++++++++++++++-----------
 2 files changed, 324 insertions(+), 114 deletions(-)

diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 81b7499..815aea6 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -188,8 +188,8 @@ impl SchemaKind {
     }
 }
 
-impl<'a> From<&'a types::Value> for SchemaKind {
-    fn from(value: &'a types::Value) -> Self {
+impl From<&types::Value> for SchemaKind {
+    fn from(value: &types::Value) -> Self {
         use crate::types::Value;
         match value {
             Value::Null => Self::Null,
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 322d21b..685904b 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -19,7 +19,9 @@
 use crate::{
     decimal::Decimal,
     duration::Duration,
-    schema::{Name, Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
+    schema::{
+        NamesRef, Precision, RecordField, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema,
+    },
     AvroResult, Error,
 };
 use serde_json::{Number, Value as JsonValue};
@@ -410,86 +412,57 @@ impl Value {
     /// resolution.
     pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
         // FIXME transition to using resolved Schema
-        let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
-        self.resolve_internal(schema, &mut schemas_by_name)
-    }
-
-    fn resolve_internal(
-        mut self,
-        schema: &Schema,
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) -> AvroResult<Self> {
-        pub fn resolve0(
-            value: &mut Value,
-            schema: &Schema,
-            schemas_by_name: &mut HashMap<Name, Schema>,
-        ) -> AvroResult<Value> {
-            // Check if this schema is a union, and if the reader schema is not.
-            if SchemaKind::from(&value.clone()) == SchemaKind::Union
-                && SchemaKind::from(schema) != SchemaKind::Union
-            {
-                // Pull out the Union, and attempt to resolve against it.
-                let v = match value {
-                    Value::Union(_i, b) => &**b,
-                    _ => unreachable!(),
-                };
-                *value = v.clone();
-            }
-            let val: Value = value.clone();
-            match *schema {
-                Schema::Ref { ref name } => {
-                    if let Some(resolved) = schemas_by_name.get(name) {
-                        resolve0(value, resolved, &mut schemas_by_name.clone())
-                    } else {
-                        Err(Error::SchemaResolutionError(name.clone()))
-                    }
-                }
-                Schema::Null => val.resolve_null(),
-                Schema::Boolean => val.resolve_boolean(),
-                Schema::Int => val.resolve_int(),
-                Schema::Long => val.resolve_long(),
-                Schema::Float => val.resolve_float(),
-                Schema::Double => val.resolve_double(),
-                Schema::Bytes => val.resolve_bytes(),
-                Schema::String => val.resolve_string(),
-                Schema::Fixed { ref name, size, .. } => {
-                    schemas_by_name.insert(name.clone(), schema.clone());
-                    val.resolve_fixed(size)
-                }
-                Schema::Union(ref inner) => val.resolve_union(inner, schemas_by_name),
-                Schema::Enum {
-                    ref name,
-                    ref symbols,
-                    ..
-                } => {
-                    schemas_by_name.insert(name.clone(), schema.clone());
-                    val.resolve_enum(symbols)
-                }
-                Schema::Array(ref inner) => val.resolve_array(inner, schemas_by_name),
-                Schema::Map(ref inner) => val.resolve_map(inner, schemas_by_name),
-                Schema::Record {
-                    ref name,
-                    ref fields,
-                    ..
-                } => {
-                    schemas_by_name.insert(name.clone(), schema.clone());
-                    val.resolve_record(fields, schemas_by_name)
+        let rs = ResolvedSchema::try_from(schema)?;
+        self.resolve_internal(schema, rs.get_names())
+    }
+
+    fn resolve_internal(mut self, schema: &Schema, names: &NamesRef) -> AvroResult<Self> {
+        // Check if this schema is a union, and if the reader schema is not.
+        if SchemaKind::from(&self) == SchemaKind::Union
+            && SchemaKind::from(schema) != SchemaKind::Union
+        {
+            // Pull out the Union, and attempt to resolve against it.
+            let v = match self {
+                Value::Union(_i, b) => *b,
+                _ => unreachable!(),
+            };
+            self = v;
+        }
+        match *schema {
+            Schema::Ref { ref name } => {
+                if let Some(resolved) = names.get(name) {
+                    self.resolve_internal(resolved, names)
+                } else {
+                    Err(Error::SchemaResolutionError(name.clone()))
                 }
-                Schema::Decimal {
-                    scale,
-                    precision,
-                    ref inner,
-                } => val.resolve_decimal(precision, scale, inner),
-                Schema::Date => val.resolve_date(),
-                Schema::TimeMillis => val.resolve_time_millis(),
-                Schema::TimeMicros => val.resolve_time_micros(),
-                Schema::TimestampMillis => val.resolve_timestamp_millis(),
-                Schema::TimestampMicros => val.resolve_timestamp_micros(),
-                Schema::Duration => val.resolve_duration(),
-                Schema::Uuid => val.resolve_uuid(),
             }
+            Schema::Null => self.resolve_null(),
+            Schema::Boolean => self.resolve_boolean(),
+            Schema::Int => self.resolve_int(),
+            Schema::Long => self.resolve_long(),
+            Schema::Float => self.resolve_float(),
+            Schema::Double => self.resolve_double(),
+            Schema::Bytes => self.resolve_bytes(),
+            Schema::String => self.resolve_string(),
+            Schema::Fixed { size, .. } => self.resolve_fixed(size),
+            Schema::Union(ref inner) => self.resolve_union(inner, names),
+            Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
+            Schema::Array(ref inner) => self.resolve_array(inner, names),
+            Schema::Map(ref inner) => self.resolve_map(inner, names),
+            Schema::Record { ref fields, .. } => self.resolve_record(fields, names),
+            Schema::Decimal {
+                scale,
+                precision,
+                ref inner,
+            } => self.resolve_decimal(precision, scale, inner),
+            Schema::Date => self.resolve_date(),
+            Schema::TimeMillis => self.resolve_time_millis(),
+            Schema::TimeMicros => self.resolve_time_micros(),
+            Schema::TimestampMillis => self.resolve_timestamp_millis(),
+            Schema::TimestampMicros => self.resolve_timestamp_micros(),
+            Schema::Duration => self.resolve_duration(),
+            Schema::Uuid => self.resolve_uuid(),
         }
-        resolve0(&mut self, schema, schemas_by_name)
     }
 
     fn resolve_uuid(self) -> Result<Self, Error> {
@@ -719,11 +692,7 @@ impl Value {
         }
     }
 
-    fn resolve_union(
-        self,
-        schema: &UnionSchema,
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) -> Result<Self, Error> {
+    fn resolve_union(self, schema: &UnionSchema, names: &NamesRef) -> Result<Self, Error> {
         let v = match self {
             // Both are unions case.
             Value::Union(_i, v) => *v,
@@ -731,34 +700,21 @@ impl Value {
             v => v,
         };
 
-        schema.schemas.iter().for_each(|s| match s {
-            Schema::Record { name, .. }
-            | Schema::Enum { name, .. }
-            | Schema::Fixed { name, .. } => {
-                schemas_by_name.insert(name.clone(), s.clone());
-            }
-            _ => (),
-        });
-
         // Find the first match in the reader schema.
         // FIXME: this might be wrong when the union consists of multiple same records that have different names
         let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
         Ok(Value::Union(
             i as u32,
-            Box::new(v.resolve_internal(inner, schemas_by_name)?),
+            Box::new(v.resolve_internal(inner, names)?),
         ))
     }
 
-    fn resolve_array(
-        self,
-        schema: &Schema,
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) -> Result<Self, Error> {
+    fn resolve_array(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
         match self {
             Value::Array(items) => Ok(Value::Array(
                 items
                     .into_iter()
-                    .map(|item| item.resolve_internal(schema, schemas_by_name))
+                    .map(|item| item.resolve_internal(schema, names))
                     .collect::<Result<_, _>>()?,
             )),
             other => Err(Error::GetArray {
@@ -768,18 +724,14 @@ impl Value {
         }
     }
 
-    fn resolve_map(
-        self,
-        schema: &Schema,
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) -> Result<Self, Error> {
+    fn resolve_map(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
         match self {
             Value::Map(items) => Ok(Value::Map(
                 items
                     .into_iter()
                     .map(|(key, value)| {
                         value
-                            .resolve_internal(schema, schemas_by_name)
+                            .resolve_internal(schema, names)
                             .map(|value| (key, value))
                     })
                     .collect::<Result<_, _>>()?,
@@ -791,11 +743,7 @@ impl Value {
         }
     }
 
-    fn resolve_record(
-        self,
-        fields: &[RecordField],
-        schemas_by_name: &mut HashMap<Name, Schema>,
-    ) -> Result<Self, Error> {
+    fn resolve_record(self, fields: &[RecordField], names: &NamesRef) -> Result<Self, Error> {
         let mut items = match self {
             Value::Map(items) => Ok(items),
             Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -828,7 +776,7 @@ impl Value {
                                         0,
                                         Box::new(
                                             Value::from(value.clone())
-                                                .resolve_internal(first, schemas_by_name)?,
+                                                .resolve_internal(first, names)?,
                                         ),
                                     ),
                                 }
@@ -841,7 +789,7 @@ impl Value {
                     },
                 };
                 value
-                    .resolve_internal(&field.schema, schemas_by_name)
+                    .resolve_internal(&field.schema, names)
                     .map(|value| (field.name.clone(), value))
             })
             .collect::<Result<Vec<_>, _>>()?;
@@ -1637,4 +1585,266 @@ mod tests {
             .resolve(&schema)
             .expect("Record definition defined in union must be resolvabled in other field");
     }
+
+    #[test]
+    fn test_avro_3461_test_multi_level_resolve_outer_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "space.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        outer_record_variation_1
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_2
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_3
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+    }
+
+    #[test]
+    fn test_avro_3461_test_multi_level_resolve_middle_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "middle_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        outer_record_variation_1
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_2
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_3
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+    }
+
+    #[test]
+    fn test_avro_3461_test_multi_level_resolve_inner_namespace() {
+        let schema = r#"
+        {
+          "name": "record_name",
+          "namespace": "space",
+          "type": "record",
+          "fields": [
+            {
+              "name": "outer_field_1",
+              "type": [
+                        "null",
+                        {
+                            "type": "record",
+                            "name": "middle_record_name",
+                            "namespace":"middle_namespace",
+                            "fields":[
+                                {
+                                    "name":"middle_field_1",
+                                    "type":[
+                                        "null",
+                                        {
+                                            "type":"record",
+                                            "name":"inner_record_name",
+                                            "namespace":"inner_namespace",
+                                            "fields":[
+                                                {
+                                                    "name":"inner_field_1",
+                                                    "type":"double"
+                                                }
+                                            ]
+                                        }
+                                    ]
+                                }
+                            ]
+                        }
+                    ]
+            },
+            {
+                "name": "outer_field_2",
+                "type" : "inner_namespace.inner_record_name"
+            }
+          ]
+        }
+        "#;
+        let schema = Schema::parse_str(schema).unwrap();
+
+        let inner_record = Value::Record(vec![("inner_field_1".into(), Value::Double(5.4))]);
+        let middle_record_variation_1 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(0, Box::new(Value::Null)),
+        )]);
+        let middle_record_variation_2 = Value::Record(vec![(
+            "middle_field_1".into(),
+            Value::Union(1, Box::new(inner_record.clone())),
+        )]);
+        let outer_record_variation_1 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(0, Box::new(Value::Null)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_2 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_1)),
+            ),
+            ("outer_field_2".into(), inner_record.clone()),
+        ]);
+        let outer_record_variation_3 = Value::Record(vec![
+            (
+                "outer_field_1".into(),
+                Value::Union(1, Box::new(middle_record_variation_2)),
+            ),
+            ("outer_field_2".into(), inner_record),
+        ]);
+
+        outer_record_variation_1
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_2
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+        outer_record_variation_3
+            .resolve(&schema)
+            .expect("Should be able to resolve value to the schema that is it's definition");
+    }
 }