You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/11/18 12:13:14 UTC

[avro] branch avro-3674 updated: AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3674
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/avro-3674 by this push:
     new fc2a17ca3 AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal
fc2a17ca3 is described below

commit fc2a17ca3511c832dfde44a149ed959bd0ba5519
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Fri Nov 18 14:10:15 2022 +0200

    AVRO-3674: Pass the correct enclosing namespace to validate and resolve_internal
    
    Expose getters for Schema's name and namespace
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/schema.rs |  15 ++++++
 lang/rust/avro/src/types.rs  | 117 +++++++++++++++++++++++++++----------------
 lang/rust/avro/src/writer.rs |  18 ++++---
 3 files changed, 100 insertions(+), 50 deletions(-)

diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 2e2443ed3..204c5ad9f 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -823,6 +823,21 @@ impl Schema {
             _ => None,
         }
     }
+
+    /// Returns the name of the schema if it has one.
+    pub fn name(&self) -> Option<&Name> {
+        match self {
+            Schema::Ref { ref name, .. }
+            | Schema::Record { ref name, .. }
+            | Schema::Enum { ref name, .. }
+            | Schema::Fixed { ref name, .. } => Some(name),
+            _ => None,
+        }
+    }
+
+    pub fn namespace(&self) -> Namespace {
+        self.name().and_then(|n| n.namespace.clone())
+    }
 }
 
 impl Parser {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index fadff4481..c139056a3 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -340,14 +340,9 @@ impl Value {
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
         let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse");
-        let namespace = match schema {
-            Schema::Record { name, .. }
-            | Schema::Enum { name, .. }
-            | Schema::Fixed { name, .. } => &name.namespace,
-            _ => &None,
-        };
+        let enclosing_namespace = schema.namespace();
 
-        match self.validate_internal(schema, rs.get_names(), namespace) {
+        match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
             Some(error_msg) => {
                 error!(
                     "Invalid value: {:?} for schema: {:?}. Reason: {}",
@@ -372,17 +367,11 @@ impl Value {
         &self,
         schema: &Schema,
         names: &HashMap<Name, S>,
-        namespace: &Namespace,
+        enclosing_namespace: &Namespace,
     ) -> Option<String> {
         match (self, schema) {
             (_, &Schema::Ref { ref name }) => {
-                let name = match namespace {
-                    Some(namespace) => Name {
-                        name: name.name.to_owned(),
-                        namespace: Some(namespace.to_owned()),
-                    },
-                    None => name.to_owned(),
-                };
+                let name = name.fully_qualified_name(enclosing_namespace);
 
                 names.get(&name).map_or_else(
                     || {
@@ -392,7 +381,7 @@ impl Value {
                             names.keys()
                         ))
                     },
-                    |s| self.validate_internal(s.borrow(), names, namespace),
+                    |s| self.validate_internal(s.borrow(), names, &name.namespace),
                 )
             }
             (&Value::Null, &Schema::Null) => None,
@@ -473,7 +462,7 @@ impl Value {
             (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
                 .variants()
                 .get(i as usize)
-                .map(|schema| value.validate_internal(schema, names, namespace))
+                .map(|schema| value.validate_internal(schema, names, enclosing_namespace))
                 .unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))),
             (v, &Schema::Union(ref inner)) => match inner.find_schema(v) {
                 Some(_) => None,
@@ -481,12 +470,18 @@ impl Value {
             },
             (&Value::Array(ref items), &Schema::Array(ref inner)) => {
                 items.iter().fold(None, |acc, item| {
-                    Value::accumulate(acc, item.validate_internal(inner, names, namespace))
+                    Value::accumulate(
+                        acc,
+                        item.validate_internal(inner, names, enclosing_namespace),
+                    )
                 })
             }
             (&Value::Map(ref items), &Schema::Map(ref inner)) => {
                 items.iter().fold(None, |acc, (_, value)| {
-                    Value::accumulate(acc, value.validate_internal(inner, names, namespace))
+                    Value::accumulate(
+                        acc,
+                        value.validate_internal(inner, names, enclosing_namespace),
+                    )
                 })
             }
             (
@@ -522,7 +517,11 @@ impl Value {
                                 let field = &fields[*idx];
                                 Value::accumulate(
                                     acc,
-                                    record_field.validate_internal(&field.schema, names, namespace),
+                                    record_field.validate_internal(
+                                        &field.schema,
+                                        names,
+                                        enclosing_namespace,
+                                    ),
                                 )
                             }
                             None => Value::accumulate(
@@ -538,7 +537,7 @@ impl Value {
             (&Value::Map(ref items), &Schema::Record { ref fields, .. }) => {
                 fields.iter().fold(None, |acc, field| {
                     if let Some(item) = items.get(&field.name) {
-                        let res = item.validate_internal(&field.schema, names, namespace);
+                        let res = item.validate_internal(&field.schema, names, enclosing_namespace);
                         Value::accumulate(acc, res)
                     } else if !field.is_nullable() {
                         Value::accumulate(
@@ -564,12 +563,18 @@ impl Value {
     /// in the Avro specification for the full set of rules of schema
     /// resolution.
     pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+        let enclosing_namespace = schema.namespace();
         // FIXME transition to using resolved Schema
         let rs = ResolvedSchema::try_from(schema)?;
-        self.resolve_internal(schema, rs.get_names())
+        self.resolve_internal(schema, rs.get_names(), &enclosing_namespace)
     }
 
-    fn resolve_internal(mut self, schema: &Schema, names: &NamesRef) -> AvroResult<Self> {
+    fn resolve_internal(
+        mut self,
+        schema: &Schema,
+        names: &NamesRef,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Self> {
         // Check if this schema is a union, and if the reader schema is not.
         if SchemaKind::from(&self) == SchemaKind::Union
             && SchemaKind::from(schema) != SchemaKind::Union
@@ -583,9 +588,11 @@ impl Value {
         }
         match *schema {
             Schema::Ref { ref name } => {
-                if let Some(resolved) = names.get(name) {
+                let name = name.fully_qualified_name(enclosing_namespace);
+
+                if let Some(resolved) = names.get(&name) {
                     info!("Resolved {:?}", name);
-                    self.resolve_internal(resolved, names)
+                    self.resolve_internal(resolved, names, &name.namespace)
                 } else {
                     error!("Failed to resolve schema {:?}", name);
                     Err(Error::SchemaResolutionError(name.clone()))
@@ -600,11 +607,13 @@ impl Value {
             Schema::Bytes => self.resolve_bytes(),
             Schema::String => self.resolve_string(),
             Schema::Fixed { size, .. } => self.resolve_fixed(size),
-            Schema::Union(ref inner) => self.resolve_union(inner, names),
+            Schema::Union(ref inner) => self.resolve_union(inner, names, enclosing_namespace),
             Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
-            Schema::Array(ref inner) => self.resolve_array(inner, names),
-            Schema::Map(ref inner) => self.resolve_map(inner, names),
-            Schema::Record { ref fields, .. } => self.resolve_record(fields, names),
+            Schema::Array(ref inner) => self.resolve_array(inner, names, enclosing_namespace),
+            Schema::Map(ref inner) => self.resolve_map(inner, names, enclosing_namespace),
+            Schema::Record { ref fields, .. } => {
+                self.resolve_record(fields, names, enclosing_namespace)
+            }
             Schema::Decimal {
                 scale,
                 precision,
@@ -848,7 +857,12 @@ impl Value {
         }
     }
 
-    fn resolve_union(self, schema: &UnionSchema, names: &NamesRef) -> Result<Self, Error> {
+    fn resolve_union(
+        self,
+        schema: &UnionSchema,
+        names: &NamesRef,
+        enclosing_namespace: &Namespace,
+    ) -> Result<Self, Error> {
         let v = match self {
             // Both are unions case.
             Value::Union(_i, v) => *v,
@@ -861,16 +875,21 @@ impl Value {
         let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
         Ok(Value::Union(
             i as u32,
-            Box::new(v.resolve_internal(inner, names)?),
+            Box::new(v.resolve_internal(inner, names, enclosing_namespace)?),
         ))
     }
 
-    fn resolve_array(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
+    fn resolve_array(
+        self,
+        schema: &Schema,
+        names: &NamesRef,
+        enclosing_namespace: &Namespace,
+    ) -> Result<Self, Error> {
         match self {
             Value::Array(items) => Ok(Value::Array(
                 items
                     .into_iter()
-                    .map(|item| item.resolve_internal(schema, names))
+                    .map(|item| item.resolve_internal(schema, names, enclosing_namespace))
                     .collect::<Result<_, _>>()?,
             )),
             other => Err(Error::GetArray {
@@ -880,14 +899,19 @@ impl Value {
         }
     }
 
-    fn resolve_map(self, schema: &Schema, names: &NamesRef) -> Result<Self, Error> {
+    fn resolve_map(
+        self,
+        schema: &Schema,
+        names: &NamesRef,
+        enclosing_namespace: &Namespace,
+    ) -> Result<Self, Error> {
         match self {
             Value::Map(items) => Ok(Value::Map(
                 items
                     .into_iter()
                     .map(|(key, value)| {
                         value
-                            .resolve_internal(schema, names)
+                            .resolve_internal(schema, names, enclosing_namespace)
                             .map(|value| (key, value))
                     })
                     .collect::<Result<_, _>>()?,
@@ -899,7 +923,12 @@ impl Value {
         }
     }
 
-    fn resolve_record(self, fields: &[RecordField], names: &NamesRef) -> Result<Self, Error> {
+    fn resolve_record(
+        self,
+        fields: &[RecordField],
+        names: &NamesRef,
+        enclosing_namespace: &Namespace,
+    ) -> Result<Self, Error> {
         let mut items = match self {
             Value::Map(items) => Ok(items),
             Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -930,10 +959,11 @@ impl Value {
                                     Schema::Null => Value::Union(0, Box::new(Value::Null)),
                                     _ => Value::Union(
                                         0,
-                                        Box::new(
-                                            Value::from(value.clone())
-                                                .resolve_internal(first, names)?,
-                                        ),
+                                        Box::new(Value::from(value.clone()).resolve_internal(
+                                            first,
+                                            names,
+                                            enclosing_namespace,
+                                        )?),
                                     ),
                                 }
                             }
@@ -945,7 +975,7 @@ impl Value {
                     },
                 };
                 value
-                    .resolve_internal(&field.schema, names)
+                    .resolve_internal(&field.schema, names, enclosing_namespace)
                     .map(|value| (field.name.clone(), value))
             })
             .collect::<Result<Vec<_>, _>>()?;
@@ -2481,7 +2511,10 @@ Field with name '"b"' is not a member of the map items"#,
         let test_value: Value = msg.serialize(&mut ser).unwrap();
         assert!(test_value.validate(&schema), "test_value should validate");
         // TODO (rikheijdens): I believe this should also resolve?
-        //assert!(test_value.resolve(&schema).is_ok(), "test_value should resolve");
+        assert!(
+            test_value.resolve(&schema).is_ok(),
+            "test_value should resolve"
+        );
     }
 
     #[test]
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 054cf8075..3bc5c5122 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -488,18 +488,19 @@ fn write_value_ref_resolved(
     value: &Value,
     buffer: &mut Vec<u8>,
 ) -> AvroResult<()> {
+    let root_schema = resolved_schema.get_root_schema();
     if let Some(err) = value.validate_internal(
-        resolved_schema.get_root_schema(),
+        root_schema,
         resolved_schema.get_names(),
-        &None,
+        &root_schema.namespace(),
     ) {
         return Err(Error::ValidationWithReason(err));
     }
     encode_internal(
         value,
-        resolved_schema.get_root_schema(),
+        root_schema,
         resolved_schema.get_names(),
-        &None,
+        &root_schema.namespace(),
         buffer,
     )?;
     Ok(())
@@ -510,18 +511,19 @@ fn write_value_ref_owned_resolved(
     value: &Value,
     buffer: &mut Vec<u8>,
 ) -> AvroResult<()> {
+    let root_schema = resolved_schema.get_root_schema();
     if let Some(err) = value.validate_internal(
-        resolved_schema.get_root_schema(),
+        root_schema,
         resolved_schema.get_names(),
-        &None,
+        &root_schema.namespace(),
     ) {
         return Err(Error::ValidationWithReason(err));
     }
     encode_internal(
         value,
-        resolved_schema.get_root_schema(),
+        root_schema,
         resolved_schema.get_names(),
-        &None,
+        &root_schema.namespace(),
         buffer,
     )?;
     Ok(())