You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/08 06:02:08 UTC

[avro] branch branch-1.11 updated: [AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 9e2191a  [AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)
9e2191a is described below

commit 9e2191a460f6edec201fa87d08c75c407c9df12d
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Tue Mar 8 00:01:19 2022 -0600

    [AVRO-3433][rust] Adding in Resolve Fixes and Tests (#1582)
    
    * AVRO-3433: Rust: The canonical form should preserve schema references
    
    Do not resolve Schema::Ref when printing as JSON.
    The Java SDK complains that a type is redefined if it is not a Ref the
    second time
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Use Name as a key for parsed/resolving/input schemata
    
    This is needed to have the fullname (namespace + '.' + name) for lookups
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: adding in resolve fixes and tests
    
    * AVRO-3433: Fix formatting and clippy errors
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Give better names to the new test cases
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Collect all Union schemata in schemas_by_name to be able to resolve Schema::Ref's
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Add schema name validation
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Use the regex to capture the namespace and name
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Register all union's schemata in schemas_by_name, so that Schema::Ref's can be resolved
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3433: Add in encoding tests
    
    Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    Co-authored-by: Martin Grigorov <ma...@users.noreply.github.com>
    (cherry picked from commit aa22276152802a85cc628c418f304a164d82b3af)
---
 lang/rust/avro/src/decode.rs   |   4 +-
 lang/rust/avro/src/encode.rs   | 289 +++++++++++++++++++++++++++++++-
 lang/rust/avro/src/error.rs    |   3 +
 lang/rust/avro/src/schema.rs   |  99 +++++++----
 lang/rust/avro/src/types.rs    | 366 +++++++++++++++++++++++++++++++++++++----
 lang/rust/avro/src/writer.rs   |   4 +-
 lang/rust/avro/tests/schema.rs |   2 +-
 7 files changed, 695 insertions(+), 72 deletions(-)

diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 83ee68e..87337bb 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -332,7 +332,7 @@ mod tests {
         let inner = Box::new(Schema::Fixed {
             size: 2,
             doc: None,
-            name: Name::new("decimal"),
+            name: Name::new("decimal").unwrap(),
         });
         let schema = Schema::Decimal {
             inner,
@@ -356,7 +356,7 @@ mod tests {
         use num_bigint::ToBigInt;
         let inner = Box::new(Schema::Fixed {
             size: 13,
-            name: Name::new("decimal"),
+            name: Name::new("decimal").unwrap(),
             doc: None,
         });
         let schema = Schema::Decimal {
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 6fc969a..664cd92 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::{
-    schema::Schema,
+    schema::{Name, Schema},
     types::Value,
     util::{zig_i32, zig_i64},
 };
@@ -55,17 +55,17 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
         value: &Value,
         schema: &Schema,
         buffer: &mut Vec<u8>,
-        schemas_by_name: &mut HashMap<String, Schema>,
+        schemas_by_name: &mut HashMap<Name, Schema>,
     ) {
         match &schema {
             Schema::Ref { ref name } => {
-                let resolved = schemas_by_name.get(name.name.as_str()).unwrap();
+                let resolved = schemas_by_name.get(name).unwrap();
                 return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone());
             }
             Schema::Record { ref name, .. }
             | Schema::Enum { ref name, .. }
             | Schema::Fixed { ref name, .. } => {
-                schemas_by_name.insert(name.name.clone(), schema.clone());
+                schemas_by_name.insert(name.clone(), schema.clone());
             }
             _ => (),
         }
@@ -126,6 +126,15 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
             Value::Enum(i, _) => encode_int(*i as i32, buffer),
             Value::Union(idx, item) => {
                 if let Schema::Union(ref inner) = *schema {
+                    inner.schemas.iter().for_each(|s| match s {
+                        Schema::Record { name, .. }
+                        | Schema::Enum { name, .. }
+                        | Schema::Fixed { name, .. } => {
+                            schemas_by_name.insert(name.clone(), s.clone());
+                        }
+                        _ => (),
+                    });
+
                     let inner_schema = inner
                         .schemas
                         .get(*idx as usize)
@@ -215,4 +224,276 @@ mod tests {
         );
         assert_eq!(vec![0u8], buf);
     }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_record() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type":"Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value =
+            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+        encode(&outer_value, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_array() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"array",
+                        "items": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"map",
+                        "values":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            ("a".into(), Value::Array(vec![inner_value1])),
+            (
+                "b".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+        ]);
+        encode(&outer_value, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_map() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"map",
+                        "values":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            ("a".into(), inner_value1),
+            (
+                "b".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+        ]);
+        encode(&outer_value, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_record_wrapper() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"record",
+                        "name": "InnerWrapper",
+                        "fields": [ {
+                            "name":"j",
+                            "type":"Inner"
+                        }]
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![(
+            "j".into(),
+            Value::Record(vec![("z".into(), Value::Int(6))]),
+        )]);
+        let outer_value =
+            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+        encode(&outer_value, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_map_and_array() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"map",
+                        "values": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"array",
+                        "items":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            (
+                "a".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+            ("b".into(), Value::Array(vec![inner_value1])),
+        ]);
+        encode(&outer_value, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_definition_encode_union() {
+        let mut buf = Vec::new();
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":["null", {
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }]
+                },
+                {
+                    "name":"b",
+                    "type":"Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value1 = Value::Record(vec![
+            ("a".into(), Value::Union(1, Box::new(inner_value1))),
+            ("b".into(), inner_value2.clone()),
+        ]);
+        encode(&outer_value1, &schema, &mut buf);
+        assert!(!buf.is_empty());
+
+        buf.drain(..);
+        let outer_value2 = Value::Record(vec![
+            ("a".into(), Value::Union(0, Box::new(Value::Null))),
+            ("b".into(), inner_value2),
+        ]);
+        encode(&outer_value2, &schema, &mut buf);
+        assert!(!buf.is_empty());
+    }
 }
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 1ed2f38..b8c5939 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -279,6 +279,9 @@ pub enum Error {
     #[error("Invalid enum symbol name {0}")]
     EnumSymbolName(String),
 
+    #[error("Invalid schema name {0}. It must match the regex '{1}'")]
+    InvalidSchemaName(String, &'static str),
+
     #[error("Duplicate enum symbol {0}")]
     EnumSymbolDuplicate(String),
 
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 4474ffa..1a2f47e 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -36,7 +36,11 @@ use std::{
 use strum_macros::{EnumDiscriminants, EnumString};
 
 lazy_static! {
-    static ref ENUM_SYMBOL_NAME: Regex = Regex::new(r"[A-Za-z_][A-Za-z0-9_]*").unwrap();
+    static ref ENUM_SYMBOL_NAME_R: Regex = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();
+
+    // An optional namespace (with optional dots) followed by a name without any dots in it.
+    static ref SCHEMA_NAME_R: Regex =
+        Regex::new(r"^((?P<namespace>[A-Za-z_][A-Za-z0-9_\.]*)*\.)?(?P<name>[A-Za-z_][A-Za-z0-9_]*)$").unwrap();
 }
 
 /// Represents an Avro schema fingerprint
@@ -235,30 +239,30 @@ impl Name {
     /// Create a new `Name`.
     /// Parses the optional `namespace` from the `name` string.
     /// `aliases` will not be defined.
-    pub fn new(name: &str) -> Name {
-        let (name, namespace) = Name::get_name_and_namespace(name);
-        Name {
+    pub fn new(name: &str) -> AvroResult<Name> {
+        let (name, namespace) = Name::get_name_and_namespace(name)?;
+        Ok(Name {
             name,
             namespace,
             aliases: None,
-        }
+        })
     }
 
-    fn get_name_and_namespace(name: &str) -> (String, Option<String>) {
-        if let Some(idx) = name.rfind('.') {
-            let namespace_from_name = name[..idx].to_owned();
-            let name_from_name = name[idx + 1..].to_owned();
-            (name_from_name, Some(namespace_from_name))
-        } else {
-            (name.to_owned(), None)
-        }
+    fn get_name_and_namespace(name: &str) -> AvroResult<(String, Option<String>)> {
+        let caps = SCHEMA_NAME_R
+            .captures(name)
+            .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
+        Ok((
+            caps["name"].to_string(),
+            caps.name("namespace").map(|s| s.as_str().to_string()),
+        ))
     }
 
     /// Parse a `serde_json::Value` into a `Name`.
     fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
         let (name, namespace_from_name) = complex
             .name()
-            .map(|name| Name::get_name_and_namespace(name.as_str()))
+            .map(|name| Name::get_name_and_namespace(name.as_str()).unwrap())
             .ok_or(Error::GetNameField)?;
 
         // FIXME Reading name from the type is wrong ! The name there is just a metadata (AVRO-3430)
@@ -309,7 +313,7 @@ impl Name {
 
 impl From<&str> for Name {
     fn from(name: &str) -> Self {
-        Name::new(name)
+        Name::new(name).unwrap()
     }
 }
 
@@ -635,7 +639,7 @@ impl Parser {
             }
         }
 
-        let name = Name::new(name);
+        let name = Name::new(name)?;
 
         if let Some(parsed) = self.parsed_schemas.get(&name) {
             return Ok(get_schema_ref(parsed));
@@ -857,7 +861,8 @@ impl Parser {
                     Some(ref ns) => format!("{}.{}", ns, alias),
                     None => alias.clone(),
                 };
-                let alias_name = Name::new(alias_fullname.as_str());
+
+                let alias_name = Name::new(alias_fullname.as_str()).unwrap();
                 self.resolving_schemas
                     .insert(alias_name, resolving_schema.clone());
             });
@@ -876,7 +881,7 @@ impl Parser {
                     Some(ref ns) => format!("{}.{}", ns, alias),
                     None => alias.clone(),
                 };
-                let alias_name = Name::new(alias_fullname.as_str());
+                let alias_name = Name::new(alias_fullname.as_str()).unwrap();
                 self.resolving_schemas.remove(&alias_name);
                 self.parsed_schemas.insert(alias_name, schema.clone());
             });
@@ -887,7 +892,7 @@ impl Parser {
     fn get_already_seen_schema(&self, complex: &Map<String, Value>) -> Option<&Schema> {
         match complex.get("type") {
             Some(Value::String(ref typ)) => {
-                let name = Name::new(typ.as_str());
+                let name = Name::new(typ.as_str()).unwrap();
                 self.resolving_schemas
                     .get(&name)
                     .or_else(|| self.parsed_schemas.get(&name))
@@ -967,7 +972,7 @@ impl Parser {
         let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len());
         for symbol in symbols.iter() {
             // Ensure enum symbol names match [A-Za-z_][A-Za-z0-9_]*
-            if !ENUM_SYMBOL_NAME.is_match(symbol) {
+            if !ENUM_SYMBOL_NAME_R.is_match(symbol) {
                 return Err(Error::EnumSymbolName(symbol.to_string()));
             }
 
@@ -1056,7 +1061,7 @@ impl Parser {
 fn get_schema_type_name(name: Name, value: Value) -> Name {
     match value.get("type") {
         Some(Value::Object(complex_type)) => match complex_type.name() {
-            Some(name) => Name::new(name.as_str()),
+            Some(name) => Name::new(name.as_str()).unwrap(),
             _ => name,
         },
         _ => name,
@@ -1198,7 +1203,7 @@ impl Serialize for Schema {
                 // the Avro doesn't indicate what the name of the underlying fixed type of a
                 // duration should be or typically is.
                 let inner = Schema::Fixed {
-                    name: Name::new("duration"),
+                    name: Name::new("duration").unwrap(),
                     doc: None,
                     size: 12,
                 };
@@ -1447,7 +1452,7 @@ mod tests {
             .clone();
 
         let schema_c_expected = Schema::Record {
-            name: Name::new("C"),
+            name: Name::new("C").unwrap(),
             doc: None,
             fields: vec![RecordField {
                 name: "field_one".to_string(),
@@ -1456,10 +1461,10 @@ mod tests {
                 schema: Schema::Union(
                     UnionSchema::new(vec![
                         Schema::Ref {
-                            name: Name::new("A"),
+                            name: Name::new("A").unwrap(),
                         },
                         Schema::Ref {
-                            name: Name::new("B"),
+                            name: Name::new("B").unwrap(),
                         },
                     ])
                     .unwrap(),
@@ -1502,7 +1507,7 @@ mod tests {
             .clone();
 
         let schema_option_a_expected = Schema::Record {
-            name: Name::new("OptionA"),
+            name: Name::new("OptionA").unwrap(),
             doc: None,
             fields: vec![RecordField {
                 name: "field_one".to_string(),
@@ -1512,7 +1517,7 @@ mod tests {
                     UnionSchema::new(vec![
                         Schema::Null,
                         Schema::Ref {
-                            name: Name::new("A"),
+                            name: Name::new("A").unwrap(),
                         },
                     ])
                     .unwrap(),
@@ -1547,7 +1552,7 @@ mod tests {
         lookup.insert("b".to_owned(), 1);
 
         let expected = Schema::Record {
-            name: Name::new("test"),
+            name: Name::new("test").unwrap(),
             doc: None,
             fields: vec![
                 RecordField {
@@ -1605,14 +1610,14 @@ mod tests {
         node_lookup.insert("label".to_owned(), 0);
 
         let expected = Schema::Record {
-            name: Name::new("test"),
+            name: Name::new("test").unwrap(),
             doc: None,
             fields: vec![RecordField {
                 name: "recordField".to_string(),
                 doc: None,
                 default: None,
                 schema: Schema::Record {
-                    name: Name::new("Node"),
+                    name: Name::new("Node").unwrap(),
                     doc: None,
                     fields: vec![
                         RecordField {
@@ -1628,7 +1633,7 @@ mod tests {
                             doc: None,
                             default: None,
                             schema: Schema::Array(Box::new(Schema::Ref {
-                                name: Name::new("Node"),
+                                name: Name::new("Node").unwrap(),
                             })),
                             order: RecordFieldOrder::Ascending,
                             position: 1,
@@ -2034,7 +2039,7 @@ mod tests {
         ).unwrap();
 
         let expected = Schema::Enum {
-            name: Name::new("Suit"),
+            name: Name::new("Suit").unwrap(),
             doc: None,
             symbols: vec![
                 "diamonds".to_owned(),
@@ -2070,7 +2075,7 @@ mod tests {
         let schema = Schema::parse_str(r#"{"type": "fixed", "name": "test", "size": 16}"#).unwrap();
 
         let expected = Schema::Fixed {
-            name: Name::new("test"),
+            name: Name::new("test").unwrap(),
             doc: None,
             size: 16usize,
         };
@@ -2086,7 +2091,7 @@ mod tests {
         .unwrap();
 
         let expected = Schema::Fixed {
-            name: Name::new("test"),
+            name: Name::new("test").unwrap(),
             doc: Some(String::from("FixedSchema documentation")),
             size: 16usize,
         };
@@ -2334,4 +2339,30 @@ mod tests {
             panic!("Expected a record schema!");
         }
     }
+
+    #[test]
+    /// Zero-length namespace is considered as no-namespace.
+    fn test_namespace_from_name_with_empty_value() {
+        let name = Name::new(".name").unwrap();
+        assert_eq!(name.name, "name");
+        assert_eq!(name.namespace, None);
+    }
+
+    #[test]
+    /// Whitespace is not allowed in the name.
+    fn test_name_with_whitespace_value() {
+        match Name::new(" ") {
+            Err(Error::InvalidSchemaName(_, _)) => {}
+            _ => panic!("Expected an Error::InvalidSchemaName!"),
+        }
+    }
+
+    #[test]
+    /// The name must be non-empty.
+    fn test_name_with_no_name_part() {
+        match Name::new("space.") {
+            Err(Error::InvalidSchemaName(_, _)) => {}
+            _ => panic!("Expected an Error::InvalidSchemaName!"),
+        }
+    }
 }
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 85e7341..e2f6962 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -19,7 +19,7 @@
 use crate::{
     decimal::Decimal,
     duration::Duration,
-    schema::{Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
+    schema::{Name, Precision, RecordField, Scale, Schema, SchemaKind, UnionSchema},
     AvroResult, Error,
 };
 use serde_json::{Number, Value as JsonValue};
@@ -408,11 +408,20 @@ impl Value {
     /// See [Schema Resolution](https://avro.apache.org/docs/current/spec.html#Schema+Resolution)
     /// in the Avro specification for the full set of rules of schema
     /// resolution.
-    pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
+    pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
+        let mut schemas_by_name: HashMap<Name, Schema> = HashMap::new();
+        self.resolve_internal(schema, &mut schemas_by_name)
+    }
+
+    fn resolve_internal(
+        mut self,
+        schema: &Schema,
+        schemas_by_name: &mut HashMap<Name, Schema>,
+    ) -> AvroResult<Self> {
         pub fn resolve0(
             value: &mut Value,
             schema: &Schema,
-            schemas_by_name: &mut HashMap<String, Schema>,
+            schemas_by_name: &mut HashMap<Name, Schema>,
         ) -> AvroResult<Value> {
             // Check if this schema is a union, and if the reader schema is not.
             if SchemaKind::from(&value.clone()) == SchemaKind::Union
@@ -428,10 +437,10 @@ impl Value {
             let val: Value = value.clone();
             match *schema {
                 Schema::Ref { ref name } => {
-                    if let Some(resolved) = schemas_by_name.get(name.name.as_str()) {
+                    if let Some(resolved) = schemas_by_name.get(name) {
                         resolve0(value, resolved, &mut schemas_by_name.clone())
                     } else {
-                        Err(Error::SchemaResolutionError(name.name.clone()))
+                        Err(Error::SchemaResolutionError(name.fullname(None)))
                     }
                 }
                 Schema::Null => val.resolve_null(),
@@ -443,27 +452,27 @@ impl Value {
                 Schema::Bytes => val.resolve_bytes(),
                 Schema::String => val.resolve_string(),
                 Schema::Fixed { ref name, size, .. } => {
-                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    schemas_by_name.insert(name.clone(), schema.clone());
                     val.resolve_fixed(size)
                 }
-                Schema::Union(ref inner) => val.resolve_union(inner),
+                Schema::Union(ref inner) => val.resolve_union(inner, schemas_by_name),
                 Schema::Enum {
                     ref name,
                     ref symbols,
                     ..
                 } => {
-                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    schemas_by_name.insert(name.clone(), schema.clone());
                     val.resolve_enum(symbols)
                 }
-                Schema::Array(ref inner) => val.resolve_array(inner),
-                Schema::Map(ref inner) => val.resolve_map(inner),
+                Schema::Array(ref inner) => val.resolve_array(inner, schemas_by_name),
+                Schema::Map(ref inner) => val.resolve_map(inner, schemas_by_name),
                 Schema::Record {
                     ref name,
                     ref fields,
                     ..
                 } => {
-                    schemas_by_name.insert(name.name.clone(), schema.clone());
-                    val.resolve_record(fields)
+                    schemas_by_name.insert(name.clone(), schema.clone());
+                    val.resolve_record(fields, schemas_by_name)
                 }
                 Schema::Decimal {
                     scale,
@@ -479,9 +488,7 @@ impl Value {
                 Schema::Uuid => val.resolve_uuid(),
             }
         }
-
-        let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
-        resolve0(&mut self, schema, &mut schemas_by_name)
+        resolve0(&mut self, schema, schemas_by_name)
     }
 
     fn resolve_uuid(self) -> Result<Self, Error> {
@@ -711,25 +718,46 @@ impl Value {
         }
     }
 
-    fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
+    fn resolve_union(
+        self,
+        schema: &UnionSchema,
+        schemas_by_name: &mut HashMap<Name, Schema>,
+    ) -> Result<Self, Error> {
         let v = match self {
             // Both are unions case.
             Value::Union(_i, v) => *v,
             // Reader is a union, but writer is not.
             v => v,
         };
+
+        schema.schemas.iter().for_each(|s| match s {
+            Schema::Record { name, .. }
+            | Schema::Enum { name, .. }
+            | Schema::Fixed { name, .. } => {
+                schemas_by_name.insert(name.clone(), s.clone());
+            }
+            _ => (),
+        });
+
         // Find the first match in the reader schema.
         // FIXME: this might be wrong when the union consists of multiple same records that have different names
         let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
-        Ok(Value::Union(i as u32, Box::new(v.resolve(inner)?)))
+        Ok(Value::Union(
+            i as u32,
+            Box::new(v.resolve_internal(inner, schemas_by_name)?),
+        ))
     }
 
-    fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
+    fn resolve_array(
+        self,
+        schema: &Schema,
+        schemas_by_name: &mut HashMap<Name, Schema>,
+    ) -> Result<Self, Error> {
         match self {
             Value::Array(items) => Ok(Value::Array(
                 items
                     .into_iter()
-                    .map(|item| item.resolve(schema))
+                    .map(|item| item.resolve_internal(schema, schemas_by_name))
                     .collect::<Result<_, _>>()?,
             )),
             other => Err(Error::GetArray {
@@ -739,12 +767,20 @@ impl Value {
         }
     }
 
-    fn resolve_map(self, schema: &Schema) -> Result<Self, Error> {
+    fn resolve_map(
+        self,
+        schema: &Schema,
+        schemas_by_name: &mut HashMap<Name, Schema>,
+    ) -> Result<Self, Error> {
         match self {
             Value::Map(items) => Ok(Value::Map(
                 items
                     .into_iter()
-                    .map(|(key, value)| value.resolve(schema).map(|value| (key, value)))
+                    .map(|(key, value)| {
+                        value
+                            .resolve_internal(schema, schemas_by_name)
+                            .map(|value| (key, value))
+                    })
                     .collect::<Result<_, _>>()?,
             )),
             other => Err(Error::GetMap {
@@ -754,7 +790,11 @@ impl Value {
         }
     }
 
-    fn resolve_record(self, fields: &[RecordField]) -> Result<Self, Error> {
+    fn resolve_record(
+        self,
+        fields: &[RecordField],
+        schemas_by_name: &mut HashMap<Name, Schema>,
+    ) -> Result<Self, Error> {
         let mut items = match self {
             Value::Map(items) => Ok(items),
             Value::Record(fields) => Ok(fields.into_iter().collect::<HashMap<_, _>>()),
@@ -785,7 +825,10 @@ impl Value {
                                     Schema::Null => Value::Union(0, Box::new(Value::Null)),
                                     _ => Value::Union(
                                         0,
-                                        Box::new(Value::from(value.clone()).resolve(first)?),
+                                        Box::new(
+                                            Value::from(value.clone())
+                                                .resolve_internal(first, schemas_by_name)?,
+                                        ),
                                     ),
                                 }
                             }
@@ -797,7 +840,7 @@ impl Value {
                     },
                 };
                 value
-                    .resolve(&field.schema)
+                    .resolve_internal(&field.schema, schemas_by_name)
                     .map(|value| (field.name.clone(), value))
             })
             .collect::<Result<Vec<_>, _>>()?;
@@ -890,7 +933,7 @@ mod tests {
     fn validate_fixed() {
         let schema = Schema::Fixed {
             size: 4,
-            name: Name::new("some_fixed"),
+            name: Name::new("some_fixed").unwrap(),
             doc: None,
         };
 
@@ -903,7 +946,7 @@ mod tests {
     #[test]
     fn validate_enum() {
         let schema = Schema::Enum {
-            name: Name::new("some_enum"),
+            name: Name::new("some_enum").unwrap(),
             doc: None,
             symbols: vec![
                 "spades".to_string(),
@@ -920,7 +963,7 @@ mod tests {
         assert!(!Value::String("lorem".to_string()).validate(&schema));
 
         let other_schema = Schema::Enum {
-            name: Name::new("some_other_enum"),
+            name: Name::new("some_other_enum").unwrap(),
             doc: None,
             symbols: vec![
                 "hearts".to_string(),
@@ -944,7 +987,7 @@ mod tests {
         //    ]
         // }
         let schema = Schema::Record {
-            name: Name::new("some_record"),
+            name: Name::new("some_record").unwrap(),
             doc: None,
             fields: vec![
                 RecordField {
@@ -1095,7 +1138,7 @@ mod tests {
                 precision: 10,
                 scale: 1,
                 inner: Box::new(Schema::Fixed {
-                    name: Name::new("decimal"),
+                    name: Name::new("decimal").unwrap(),
                     size: 20,
                     doc: None
                 })
@@ -1323,4 +1366,269 @@ mod tests {
             JsonValue::String("936da01f-9abd-4d9d-80c7-02af85c822a8".into())
         );
     }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_record() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type":"Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer = Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+        outer
+            .resolve(&schema)
+            .expect("Record definition defined in one field must be availible in other field");
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_array() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"array",
+                        "items": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"map",
+                        "values":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            ("a".into(), Value::Array(vec![inner_value1])),
+            (
+                "b".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+        ]);
+        outer_value
+            .resolve(&schema)
+            .expect("Record defined in array definition must be resolveable from map");
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_map() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"map",
+                        "values":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            ("a".into(), inner_value1),
+            (
+                "b".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+        ]);
+        outer_value
+            .resolve(&schema)
+            .expect("Record defined in record field must be resolvable from map field");
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_record_wrapper() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"record",
+                        "name": "InnerWrapper",
+                        "fields": [ {
+                            "name":"j",
+                            "type":"Inner"
+                        }]
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![(
+            "j".into(),
+            Value::Record(vec![("z".into(), Value::Int(6))]),
+        )]);
+        let outer_value =
+            Value::Record(vec![("a".into(), inner_value1), ("b".into(), inner_value2)]);
+        outer_value.resolve(&schema).expect("Record schema defined in field must be resolvable in Record schema defined in other field");
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_map_and_array() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":{
+                        "type":"map",
+                        "values": {
+                            "type":"record",
+                            "name": "Inner",
+                            "fields": [ {
+                                "name":"z",
+                                "type":"int"
+                            }]
+                        }
+                    }
+                },
+                {
+                    "name":"b",
+                    "type": {
+                        "type":"array",
+                        "items":"Inner"
+                    }
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer_value = Value::Record(vec![
+            (
+                "a".into(),
+                Value::Map(vec![("akey".into(), inner_value2)].into_iter().collect()),
+            ),
+            ("b".into(), Value::Array(vec![inner_value1])),
+        ]);
+        outer_value
+            .resolve(&schema)
+            .expect("Record defined in map definition must be resolveable from array");
+    }
+
+    #[test]
+    fn test_avro_3433_recursive_resolves_union() {
+        let schema = Schema::parse_str(
+            r#"
+        {
+            "type":"record",
+            "name":"TestStruct",
+            "fields": [
+                {
+                    "name":"a",
+                    "type":["null", {
+                        "type":"record",
+                        "name": "Inner",
+                        "fields": [ {
+                            "name":"z",
+                            "type":"int"
+                        }]
+                    }]
+                },
+                {
+                    "name":"b",
+                    "type":"Inner"
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let inner_value1 = Value::Record(vec![("z".into(), Value::Int(3))]);
+        let inner_value2 = Value::Record(vec![("z".into(), Value::Int(6))]);
+        let outer1 = Value::Record(vec![
+            ("a".into(), inner_value1),
+            ("b".into(), inner_value2.clone()),
+        ]);
+        outer1
+            .resolve(&schema)
+            .expect("Record definition defined in union must be resolvabled in other field");
+        let outer2 = Value::Record(vec![("a".into(), Value::Null), ("b".into(), inner_value2)]);
+        outer2
+            .resolve(&schema)
+            .expect("Record definition defined in union must be resolvabled in other field");
+    }
 }
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 9aa8d7d..e9518e3 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -520,7 +520,7 @@ mod tests {
     fn decimal_fixed() -> TestResult<()> {
         let size = 30;
         let inner = Schema::Fixed {
-            name: Name::new("decimal"),
+            name: Name::new("decimal").unwrap(),
             doc: None,
             size,
         };
@@ -558,7 +558,7 @@ mod tests {
     #[test]
     fn duration() -> TestResult<()> {
         let inner = Schema::Fixed {
-            name: Name::new("duration"),
+            name: Name::new("duration").unwrap(),
             doc: None,
             size: 12,
         };
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index be2d9a8..24ae247 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -64,7 +64,7 @@ const FIXED_EXAMPLES: &[(&str, bool)] = &[
             }"#,
         true,
     ),
-    (r#"{"type": "fixed", "name": "Missing size"}"#, false),
+    (r#"{"type": "fixed", "name": "MissingSize"}"#, false),
     (r#"{"type": "fixed", "size": 314}"#, false),
 ];