You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/08/16 12:44:58 UTC

[avro] 01/02: AVRO-3609: [Rust] Add custom attributes field to Record, Enum and Fixed schemata

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3609-rust-support-custom-attributes
in repository https://gitbox.apache.org/repos/asf/avro.git

commit a9cedd0355efee6a7b97c3d34c9d3b057cd0714d
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Tue Aug 16 14:17:02 2022 +0300

    AVRO-3609: [Rust] Add custom attributes field to Record, Enum and Fixed schemata
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/decode.rs               |  2 ++
 lang/rust/avro/src/schema.rs               | 41 +++++++++++++++++++++++-------
 lang/rust/avro/src/schema_compatibility.rs |  2 ++
 lang/rust/avro/src/types.rs                | 21 ++++++++++-----
 lang/rust/avro/src/writer.rs               |  2 ++
 lang/rust/avro/tests/schema.rs             |  1 +
 lang/rust/avro_derive/src/lib.rs           |  2 ++
 7 files changed, 55 insertions(+), 16 deletions(-)

diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index cb32ebdb7..4f9e7e945 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -344,6 +344,7 @@ mod tests {
             doc: None,
             name: Name::new("decimal").unwrap(),
             aliases: None,
+            attributes: Default::default(),
         });
         let schema = Schema::Decimal {
             inner,
@@ -370,6 +371,7 @@ mod tests {
             name: Name::new("decimal").unwrap(),
             aliases: None,
             doc: None,
+            attributes: Default::default(),
         });
         let schema = Schema::Decimal {
             inner,
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 0a9603fe7..04f9e62eb 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -107,6 +107,7 @@ pub enum Schema {
         doc: Documentation,
         fields: Vec<RecordField>,
         lookup: BTreeMap<String, usize>,
+        attributes: HashMap<String, Value>,
     },
     /// An `enum` Avro schema.
     Enum {
@@ -114,6 +115,7 @@ pub enum Schema {
         aliases: Aliases,
         doc: Documentation,
         symbols: Vec<String>,
+        attributes: HashMap<String, Value>,
     },
     /// A `fixed` Avro schema.
     Fixed {
@@ -121,6 +123,7 @@ pub enum Schema {
         aliases: Aliases,
         doc: Documentation,
         size: usize,
+        attributes: HashMap<String, Value>,
     },
     /// Logical type which represents `Decimal` values. The underlying type is serialized and
     /// deserialized as `Schema::Bytes` or `Schema::Fixed`.
@@ -340,7 +343,7 @@ impl<'de> Deserialize<'de> for Name {
     where
         D: serde::de::Deserializer<'de>,
     {
-        serde_json::Value::deserialize(deserializer).and_then(|value| {
+        Value::deserialize(deserializer).and_then(|value| {
             use serde::de::Error;
             if let Value::Object(json) = value {
                 Name::parse(&json).map_err(D::Error::custom)
@@ -1220,6 +1223,7 @@ impl Parser {
             doc: complex.doc(),
             fields,
             lookup,
+            attributes: Default::default(),
         };
 
         self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
@@ -1276,6 +1280,7 @@ impl Parser {
             aliases: aliases.clone(),
             doc: complex.doc(),
             symbols,
+            attributes: Default::default(),
         };
 
         self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
@@ -1357,6 +1362,7 @@ impl Parser {
             aliases: aliases.clone(),
             doc,
             size: size as usize,
+            attributes: Default::default(),
         };
 
         self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
@@ -1556,6 +1562,7 @@ impl Serialize for Schema {
                     aliases: None,
                     doc: None,
                     size: 12,
+                    attributes: Default::default(),
                 };
                 map.serialize_entry("type", &inner)?;
                 map.serialize_entry("logicalType", "duration")?;
@@ -1584,11 +1591,11 @@ impl Serialize for RecordField {
 
 /// Parses a **valid** avro schema into the Parsing Canonical Form.
 /// https://avro.apache.org/docs/1.8.2/spec.html#Parsing+Canonical+Form+for+Schemas
-fn parsing_canonical_form(schema: &serde_json::Value) -> String {
+fn parsing_canonical_form(schema: &Value) -> String {
     match schema {
-        serde_json::Value::Object(map) => pcf_map(map),
-        serde_json::Value::String(s) => pcf_string(s),
-        serde_json::Value::Array(v) => pcf_array(v),
+        Value::Object(map) => pcf_map(map),
+        Value::String(s) => pcf_string(s),
+        Value::Array(v) => pcf_array(v),
         json => panic!(
             "got invalid JSON value for canonical form of schema: {0}",
             json
@@ -1596,7 +1603,7 @@ fn parsing_canonical_form(schema: &serde_json::Value) -> String {
     }
 }
 
-fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
+fn pcf_map(schema: &Map<String, Value>) -> String {
     // Look for the namespace variant up front.
     let ns = schema.get("namespace").and_then(|v| v.as_str());
     let mut fields = Vec::new();
@@ -1604,7 +1611,7 @@ fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
         // Reduce primitive types to their simple form. ([PRIMITIVE] rule)
         if schema.len() == 1 && k == "type" {
             // Invariant: function is only callable from a valid schema, so this is acceptable.
-            if let serde_json::Value::String(s) = v {
+            if let Value::String(s) = v {
                 return pcf_string(s);
             }
         }
@@ -1656,7 +1663,7 @@ fn pcf_map(schema: &Map<String, serde_json::Value>) -> String {
     format!("{{{}}}", inter)
 }
 
-fn pcf_array(arr: &[serde_json::Value]) -> String {
+fn pcf_array(arr: &[Value]) -> String {
     let inter = arr
         .iter()
         .map(parsing_canonical_form)
@@ -1764,7 +1771,7 @@ pub mod derive {
         T: AvroSchemaComponent,
     {
         fn get_schema() -> Schema {
-            T::get_schema_in_ctxt(&mut HashMap::default(), &Option::None)
+            T::get_schema_in_ctxt(&mut HashMap::default(), &None)
         }
     }
 
@@ -2026,6 +2033,7 @@ mod tests {
                 position: 0,
             }],
             lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
+            attributes: Default::default(),
         };
 
         assert_eq!(schema_c, schema_c_expected);
@@ -2113,6 +2121,7 @@ mod tests {
                 position: 0,
             }],
             lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]),
+            attributes: Default::default(),
         };
 
         assert_eq!(schema_option_a, schema_option_a_expected);
@@ -2161,6 +2170,7 @@ mod tests {
                 },
             ],
             lookup,
+            attributes: Default::default(),
         };
 
         assert_eq!(parsed, expected);
@@ -2230,11 +2240,13 @@ mod tests {
                         },
                     ],
                     lookup: node_lookup,
+                    attributes: Default::default(),
                 },
                 order: RecordFieldOrder::Ascending,
                 position: 0,
             }],
             lookup,
+            attributes: Default::default(),
         };
         assert_eq!(schema, expected);
 
@@ -2402,6 +2414,7 @@ mod tests {
                 },
             ],
             lookup,
+            attributes: Default::default(),
         };
         assert_eq!(schema, expected);
 
@@ -2462,6 +2475,7 @@ mod tests {
                 },
             ],
             lookup,
+            attributes: Default::default(),
         };
         assert_eq!(schema, expected);
 
@@ -2515,6 +2529,7 @@ mod tests {
                         aliases: None,
                         doc: None,
                         symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
+                        attributes: Default::default(),
                     },
                     order: RecordFieldOrder::Ascending,
                     position: 0,
@@ -2531,12 +2546,14 @@ mod tests {
                         aliases: None,
                         doc: None,
                         symbols: vec!["one".to_string(), "two".to_string(), "three".to_string()],
+                        attributes: Default::default(),
                     },
                     order: RecordFieldOrder::Ascending,
                     position: 1,
                 },
             ],
             lookup,
+            attributes: Default::default(),
         };
         assert_eq!(schema, expected);
 
@@ -2590,6 +2607,7 @@ mod tests {
                         aliases: None,
                         doc: None,
                         size: 456,
+                        attributes: Default::default(),
                     },
                     order: RecordFieldOrder::Ascending,
                     position: 0,
@@ -2606,12 +2624,14 @@ mod tests {
                         aliases: None,
                         doc: None,
                         size: 456,
+                        attributes: Default::default(),
                     },
                     order: RecordFieldOrder::Ascending,
                     position: 1,
                 },
             ],
             lookup,
+            attributes: Default::default(),
         };
         assert_eq!(schema, expected);
 
@@ -2636,6 +2656,7 @@ mod tests {
                 "clubs".to_owned(),
                 "hearts".to_owned(),
             ],
+            attributes: Default::default(),
         };
 
         assert_eq!(expected, schema);
@@ -2668,6 +2689,7 @@ mod tests {
             aliases: None,
             doc: None,
             size: 16usize,
+            attributes: Default::default(),
         };
 
         assert_eq!(expected, schema);
@@ -2685,6 +2707,7 @@ mod tests {
             aliases: None,
             doc: Some(String::from("FixedSchema documentation")),
             size: 16usize,
+            attributes: Default::default(),
         };
 
         assert_eq!(expected, schema);
diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs
index 43fbbbe8d..843a139df 100644
--- a/lang/rust/avro/src/schema_compatibility.rs
+++ b/lang/rust/avro/src/schema_compatibility.rs
@@ -235,6 +235,7 @@ impl SchemaCompatibility {
                         aliases: _,
                         doc: _w_doc,
                         size: w_size,
+                        attributes: _,
                     } = writers_schema
                     {
                         if let Schema::Fixed {
@@ -242,6 +243,7 @@ impl SchemaCompatibility {
                             aliases: _,
                             doc: _r_doc,
                             size: r_size,
+                            attributes: _,
                         } = readers_schema
                         {
                             return w_name.fullname(None) == r_name.fullname(None)
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 042350b93..ac3ba21b9 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -1039,9 +1039,10 @@ mod tests {
                         position: 0,
                     }],
                     lookup: Default::default(),
+                    attributes: Default::default(),
                 },
                 false,
-                "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: There is no schema field for field 'unknown_field_name'",
+                "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {}, attributes: {} }. Reason: There is no schema field for field 'unknown_field_name'",
             ),
             (
                 Value::Record(vec![("field_name".to_string(), Value::Null)]),
@@ -1060,9 +1061,10 @@ mod tests {
                         position: 0,
                     }],
                     lookup: [("field_name".to_string(), 0)].iter().cloned().collect(),
+                    attributes: Default::default(),
                 },
                 false,
-                "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {\"field_name\": 0} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []",
+                "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {\"field_name\": 0}, attributes: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []",
             ),
         ];
 
@@ -1088,6 +1090,7 @@ mod tests {
             name: Name::new("some_fixed").unwrap(),
             aliases: None,
             doc: None,
+            attributes: Default::default(),
         };
 
         assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
@@ -1125,6 +1128,7 @@ mod tests {
                 "diamonds".to_string(),
                 "clubs".to_string(),
             ],
+            attributes: Default::default(),
         };
 
         assert!(Value::Enum(0, "spades".to_string()).validate(&schema));
@@ -1170,6 +1174,7 @@ mod tests {
                 "clubs".to_string(),
                 "spades".to_string(),
             ],
+            attributes: Default::default(),
         };
 
         let value = Value::Enum(0, "spades".to_string());
@@ -1218,6 +1223,7 @@ mod tests {
                 .iter()
                 .cloned()
                 .collect(),
+            attributes: Default::default(),
         };
 
         assert!(Value::Record(vec![
@@ -1237,7 +1243,7 @@ mod tests {
             ("b".to_string(), Value::String("foo".to_string())),
         ]);
         assert!(!value.validate(&schema));
-        assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: Unsupported value-schema combination");
+        assert_logged("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Reason: Unsupported value-schema [...]
 
         let value = Value::Record(vec![
             ("a".to_string(), Value::Long(42i64)),
@@ -1245,7 +1251,7 @@ mod tests {
         ]);
         assert!(!value.validate(&schema));
         assert_logged(
-            "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: There is no schema field for field 'c'"
+            "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1}, attributes: {} }. Reason: There is no schema field for field 'c'"
         );
 
         let value = Value::Record(vec![
@@ -1255,7 +1261,7 @@ mod tests {
         ]);
         assert!(!value.validate(&schema));
         assert_logged(
-            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: The value's records length (3) is different than the sche [...]
+            r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: The value's records length (3) is differe [...]
         );
 
         assert!(Value::Map(
@@ -1275,7 +1281,7 @@ mod tests {
         )
         .validate(&schema));
         assert_logged(
-            r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: Field with name '"a"' is not a member of the map items
+            r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1}, attributes: {} }. Reason: Field with name '"a"' is not a member of the map items
 Field with name '"b"' is not a member of the map items"#,
         );
 
@@ -1387,7 +1393,8 @@ Field with name '"b"' is not a member of the map items"#,
                     name: Name::new("decimal").unwrap(),
                     aliases: None,
                     size: 20,
-                    doc: None
+                    doc: None,
+                    attributes: Default::default(),
                 })
             })
             .is_ok());
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 2fc42cccf..98ceafe46 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -686,6 +686,7 @@ mod tests {
             aliases: None,
             doc: None,
             size,
+            attributes: Default::default(),
         };
         let value = vec![0u8; size];
         logical_type_test(
@@ -725,6 +726,7 @@ mod tests {
             aliases: None,
             doc: None,
             size: 12,
+            attributes: Default::default(),
         };
         let value = Value::Duration(Duration::new(
             Months::new(256),
diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs
index b4f59e553..f8bdd4a68 100644
--- a/lang/rust/avro/tests/schema.rs
+++ b/lang/rust/avro/tests/schema.rs
@@ -860,6 +860,7 @@ fn test_parse_reused_record_schema_by_fullname() {
             doc: _,
             ref fields,
             lookup: _,
+            attributes: _,
         } => {
             assert_eq!(name.fullname(None), "test.Weather", "Name does not match!");
 
diff --git a/lang/rust/avro_derive/src/lib.rs b/lang/rust/avro_derive/src/lib.rs
index abba0e195..18d7ed775 100644
--- a/lang/rust/avro_derive/src/lib.rs
+++ b/lang/rust/avro_derive/src/lib.rs
@@ -179,6 +179,7 @@ fn get_data_struct_schema_def(
             doc: #record_doc,
             fields: schema_fields,
             lookup,
+            attributes: Default::default(),
         }
     })
 }
@@ -204,6 +205,7 @@ fn get_data_enum_schema_def(
                 aliases: #enum_aliases,
                 doc: #doc,
                 symbols: vec![#(#symbols.to_owned()),*],
+                attributes: Default::default(),
             }
         })
     } else {