You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/08/26 08:48:17 UTC

[avro] branch branch-1.11 updated: AVRO: [Rust] Improved resolution of nullable record fields. (#1837)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new a3d120f37 AVRO: [Rust] Improved resolution of nullable record fields. (#1837)
a3d120f37 is described below

commit a3d120f3750677d1e6ab1b29384de5e8eaf38ac5
Author: Shaeq Ahmed <sh...@gmail.com>
AuthorDate: Fri Aug 26 12:47:43 2022 +0400

    AVRO: [Rust] Improved resolution of nullable record fields. (#1837)
    
    * AVRO: [Rust] Improved resolution of nullable record fields.
    
    * AVRO: [Rust] Update to improvements for nullable record fields PR.
    
    * AVRO-3621: Improve test method name
    
    * AVRO-3621: Revert non-related change
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit ddf10bd17f591638f12128c4fff2b4681b245ac0)
---
 lang/rust/avro/src/schema.rs |  47 ++++++++++++++++
 lang/rust/avro/src/types.rs  | 129 +++++++++++++++++++++++++++++++++++++------
 2 files changed, 160 insertions(+), 16 deletions(-)

diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3320e9e59..ec26a8bf9 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -634,6 +634,14 @@ impl RecordField {
         }
         custom_attributes
     }
+
+    /// Returns true if this `RecordField` is nullable, meaning the schema is a `UnionSchema` where the first variant is `Null`.
+    pub fn is_nullable(&self) -> bool {
+        match self.schema {
+            Schema::Union(ref inner) => inner.is_nullable(),
+            _ => false,
+        }
+    }
 }
 
 #[derive(Debug, Clone)]
@@ -2016,6 +2024,45 @@ mod tests {
         assert_eq!(variants.next(), None);
     }
 
+    // AVRO-3621
+    #[test]
+    fn test_avro_3621_nullable_record_field() {
+        let nullable_record_field = RecordField {
+            name: "next".to_string(),
+            doc: None,
+            default: None,
+            schema: Schema::Union(
+                UnionSchema::new(vec![
+                    Schema::Null,
+                    Schema::Ref {
+                        name: Name {
+                            name: "LongList".to_owned(),
+                            namespace: None,
+                        },
+                    },
+                ])
+                .unwrap(),
+            ),
+            order: RecordFieldOrder::Ascending,
+            position: 1,
+            custom_attributes: Default::default(),
+        };
+
+        assert!(nullable_record_field.is_nullable());
+
+        let non_nullable_record_field = RecordField {
+            name: "next".to_string(),
+            doc: None,
+            default: Some(json!(2)),
+            schema: Schema::Long,
+            order: RecordFieldOrder::Ascending,
+            position: 1,
+            custom_attributes: Default::default(),
+        };
+
+        assert!(!non_nullable_record_field.is_nullable());
+    }
+
     // AVRO-3248
     #[test]
     fn test_union_of_records() {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index e833a26b7..7e700b4ea 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -457,6 +457,10 @@ impl Value {
                 .get(i as usize)
                 .map(|schema| value.validate_internal(schema, names))
                 .unwrap_or_else(|| Some(format!("No schema in the union at position '{}'", i))),
+            (v, &Schema::Union(ref inner)) => match inner.find_schema(v) {
+                Some(_) => None,
+                None => Some("Could not find matching type in union".to_string()),
+            },
             (&Value::Array(ref items), &Schema::Array(ref inner)) => {
                 items.iter().fold(None, |acc, item| {
                     Value::accumulate(acc, item.validate_internal(inner, names))
@@ -475,11 +479,20 @@ impl Value {
                     ..
                 },
             ) => {
-                if fields.len() != record_fields.len() {
+                let non_nullable_fields_count =
+                    fields.iter().filter(|&rf| !rf.is_nullable()).count();
+
+                if record_fields.len() < non_nullable_fields_count {
+                    return Some(format!(
+                        "The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
+                        record_fields.len(),
+                        non_nullable_fields_count
+                    ));
+                } else if record_fields.len() > fields.len() {
                     return Some(format!(
-                        "The value's records length ({}) is different than the schema's ({})",
+                        "The value's records length ({}) is greater than the schema's ({} fields)",
                         record_fields.len(),
-                        fields.len()
+                        fields.len(),
                     ));
                 }
 
@@ -509,7 +522,7 @@ impl Value {
                     if let Some(item) = items.get(&field.name) {
                         let res = item.validate_internal(&field.schema, names);
                         Value::accumulate(acc, res)
-                    } else {
+                    } else if !field.is_nullable() {
                         Value::accumulate(
                             acc,
                             Some(format!(
@@ -517,6 +530,8 @@ impl Value {
                                 field.name
                             )),
                         )
+                    } else {
+                        acc
                     }
                 })
             }
@@ -1043,7 +1058,7 @@ mod tests {
                     attributes: Default::default(),
                 },
                 false,
-                "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'",
+                r#"Invalid value: Record([("unknown_field_name", Null)]) for schema: Record { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, default: None, schema: Int, order: Ignore, position: 0, custom_attributes: {} }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'"#,
             ),
             (
                 Value::Record(vec![("field_name".to_string(), Value::Null)]),
@@ -1066,7 +1081,7 @@ mod tests {
                     attributes: Default::default(),
                 },
                 false,
-                "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {\"field_name\": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []",
+                r#"Invalid value: Record([("field_name", Null)]) for schema: Record { name: Name { name: "record_name", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "field_name", doc: None, default: None, schema: Ref { name: Name { name: "missing", namespace: None } }, order: Ignore, position: 0, custom_attributes: {} }], lookup: {"field_name": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []"#,
             ),
         ];
 
@@ -1196,7 +1211,12 @@ mod tests {
         //    "type": "record",
         //    "fields": [
         //      {"type": "long", "name": "a"},
-        //      {"type": "string", "name": "b"}
+        //      {"type": "string", "name": "b"},
+        //      {
+        //          "type": ["null", "int"]
+        //          "name": "c",
+        //          "default": null
+        //      }
         //    ]
         // }
         let schema = Schema::Record {
@@ -1222,11 +1242,26 @@ mod tests {
                     position: 1,
                     custom_attributes: Default::default(),
                 },
+                RecordField {
+                    name: "c".to_string(),
+                    doc: None,
+                    default: Some(JsonValue::Null),
+                    schema: Schema::Union(
+                        UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap(),
+                    ),
+                    order: RecordFieldOrder::Ascending,
+                    position: 2,
+                    custom_attributes: Default::default(),
+                },
             ],
-            lookup: [("a".to_string(), 0), ("b".to_string(), 1)]
-                .iter()
-                .cloned()
-                .collect(),
+            lookup: [
+                ("a".to_string(), 0),
+                ("b".to_string(), 1),
+                ("c".to_string(), 2),
+            ]
+            .iter()
+            .cloned()
+            .collect(),
             attributes: Default::default(),
         };
 
@@ -1247,7 +1282,9 @@ mod tests {
             ("b".to_string(), Value::String("foo".to_string())),
         ]);
         assert!(!value.validate(&schema));
-        assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {\"a\": 0, \"b\": 1}, attr [...]
+        assert_logged(
+            r#"Invalid value: Record([("a", Boolean(false)), ("b", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null) [...]
+        );
 
         let value = Value::Record(vec![
             ("a".to_string(), Value::Long(42i64)),
@@ -1255,17 +1292,27 @@ mod tests {
         ]);
         assert!(!value.validate(&schema));
         assert_logged(
-            "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Re [...]
+            r#"Invalid value: Record([("a", Long(42)), ("c", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), sche [...]
+        );
+
+        let value = Value::Record(vec![
+            ("a".to_string(), Value::Long(42i64)),
+            ("d".to_string(), Value::String("foo".to_string())),
+        ]);
+        assert!(!value.validate(&schema));
+        assert_logged(
+            r#"Invalid value: Record([("a", Long(42)), ("d", String("foo"))]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), sche [...]
         );
 
         let value = Value::Record(vec![
             ("a".to_string(), Value::Long(42i64)),
             ("b".to_string(), Value::String("foo".to_string())),
             ("c".to_string(), Value::Null),
+            ("d".to_string(), Value::Null),
         ]);
         assert!(!value.validate(&schema));
         assert_logged(
-            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"a": 0, "b": 1}, attributes: {} }. Rea [...]
+            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null), ("d", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, [...]
         );
 
         assert!(Value::Map(
@@ -1279,13 +1326,13 @@ mod tests {
         .validate(&schema));
 
         assert!(!Value::Map(
-            vec![("c".to_string(), Value::Long(123_i64)),]
+            vec![("d".to_string(), Value::Long(123_i64)),]
                 .into_iter()
                 .collect()
         )
         .validate(&schema));
         assert_logged(
-            r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: Field with name '"a"' is not a mem [...]
+            r#"Invalid value: Map({"d": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0, custom_attributes: {} }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1, custom_attributes: {} }, RecordField { name: "c", doc: None, default: Some(Null), schema: Union(UnionSchema { sc [...]
 Field with name '"b"' is not a member of the map items"#,
         );
 
@@ -1465,6 +1512,56 @@ Field with name '"b"' is not a member of the map items"#,
         assert!(value.resolve(&Schema::TimestampMicros).is_err());
     }
 
+    #[test]
+    fn test_avro_3621_resolve_to_nullable_union() {
+        let schema = Schema::parse_str(
+            r#"{
+            "type": "record",
+            "name": "root",
+            "fields": [
+                {
+                    "name": "event",
+                    "type": [
+                        "null", 
+                        {
+                            "type": "record",
+                            "name": "event",
+                            "fields": [
+                                {
+                                    "name": "amount",
+                                    "type": "int"
+                                },
+                                {
+                                    "name": "size",
+                                    "type": [
+                                        "null",
+                                        "int"
+                                    ],
+                                    "default": null
+                                }
+                            ]
+                        }
+                    ],
+                    "default": null
+                }
+            ]
+        }"#,
+        )
+        .unwrap();
+
+        let value = Value::Record(vec![(
+            "event".to_string(),
+            Value::Record(vec![("amount".to_string(), Value::Int(200))]),
+        )]);
+        assert!(value.resolve(&schema).is_ok());
+
+        let value = Value::Record(vec![(
+            "event".to_string(),
+            Value::Record(vec![("size".to_string(), Value::Int(1))]),
+        )]);
+        assert!(value.resolve(&schema).is_err());
+    }
+
     #[test]
     fn json_from_avro() {
         assert_eq!(JsonValue::try_from(Value::Null).unwrap(), JsonValue::Null);