You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/01 11:51:15 UTC

[avro] branch branch-1.11 updated (b802832 -> ee2953e)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a change to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git.


    from b802832  AVRO-3343 Updated Codec to styling standards (#1493)
     new 81812a0  AVRO-3205 Rust: Update Cargo.toml [package] information (#1344)
     new 2d337f9  duplicate line remove (#1440)
     new 8aa071c  AVRO-3234: add new codec to lang/rust: zstandard (#1370)
     new cd53fad  AVRO-3246 Rust: Add new codec: bzip2 (#1389)
     new 76d1cc3  AVRO-3245 Rust: Replace crc crate with crc32fast (#1388)
     new 27d4ea6  Update strum requirement from 0.21.0 to 0.23.0 in /lang/rust (#1408)
     new 32e79fb  Update strum_macros requirement from 0.21.1 to 0.23.1 in /lang/rust (#1409)
     new 355cf46  AVRO-3247 Rust: Run MIRI checks (#1390)
     new 2cb26f6  AVRO-3197 Fallback to the 'type' when the logical type does not support the type (#1340)
     new e217c5e  AVRO-3216 Reuse records' schema by name (#1345)
     new e6849d1  Update zerocopy requirement from 0.3.0 to 0.6.1 in /lang/rust (#1402)
     new 578c81b  AVRO-3240: fix deserializer schema backward compatibility (#1379)
     new 02aa8d4  AVRO-3240: Fix code formatting
     new bad06d0  AVRO-3240: Move the test data in the test method
     new 522c604  AVRO-3232: add matches to deserialize_any union+string/map (#1368)
     new 5d1e525  AVRO-3232: Fix formatting
     new 33019d7  Fix nightly build by optimizing the imports
     new 1b01981  AVRO-3284: Update Rabin fingerprint implementation to digest to 0.10+ (#1446)
     new 66f2b47  AVRO-3214 Rust: Support 'doc' attribute for FixedSchema (#1343)
     new f66f747  AVRO-3303: Rust: Add support for Xz codec (#1455)
     new e8ce233  AVRO-3302: Add interop tests for rust (#1456)
     new 1a2d0a3  AVRO-3248: Rust: Support named types in UnionSchema (#1396)
     new 5af11f3  AVRO-3312: Use u32 instead of i32 for the Enum/Union's index field (#1465)
     new 9164b7d  AVRO-3302: No need of special handling for Schema::Ref while serializing to JSON
     new f666c56  AVRO-3302: Add support for parsing recursive schemas for Enum
     new f256aed  AVRO-3302: Add support for recursive types using Fixed schema
     new c81e1aa  AVRO-3316 [Rust] build breaks in docker build
     new 34ba17f  AVRO-3315: Rust: Add support to back/cycle reference an alias (#1466)
     new 029cd61  Update zstd requirement in /lang/rust (#1471)
     new ee2953e  AVRO-3339 Rust: Rename crate from avro-rs to apache-avro (#1488)

The 30 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/test-lang-rust-ci.yml            |  73 ++-
 ...ang-rust-clippy.yml => test-lang-rust-miri.yml} |  31 +-
 lang/rust/CHANGELOG.md                             |   5 +-
 lang/rust/Cargo.toml                               |  39 +-
 lang/rust/README.md                                | 105 ++--
 lang/rust/README.tpl                               |  15 +-
 lang/rust/benches/serde.rs                         |   2 +-
 lang/rust/benches/single.rs                        |   2 +-
 lang/rust/build.sh                                 |  35 +-
 lang/rust/examples/benchmark.rs                    |   2 +-
 lang/rust/examples/generate_interop_data.rs        |  99 ++++
 lang/rust/examples/test_interop_data.rs            |  59 ++
 lang/rust/examples/to_value.rs                     |   2 +-
 lang/rust/src/codec.rs                             | 127 ++++-
 lang/rust/src/de.rs                                | 203 ++++++-
 lang/rust/src/decode.rs                            | 336 ++++++-----
 lang/rust/src/encode.rs                            | 198 ++++---
 lang/rust/src/error.rs                             |  15 +-
 lang/rust/src/lib.rs                               |  87 +--
 lang/rust/src/rabin.rs                             |  37 +-
 lang/rust/src/reader.rs                            |  73 ++-
 lang/rust/src/schema.rs                            | 628 +++++++++++++++++++--
 lang/rust/src/schema_compatibility.rs              |   3 +-
 lang/rust/src/ser.rs                               |  45 +-
 lang/rust/src/types.rs                             | 209 ++++---
 lang/rust/src/util.rs                              |  43 +-
 lang/rust/src/writer.rs                            |  10 +-
 lang/rust/tests/io.rs                              |  12 +-
 lang/rust/tests/schema.rs                          | 224 +++++++-
 29 files changed, 2151 insertions(+), 568 deletions(-)
 copy .github/workflows/{test-lang-rust-clippy.yml => test-lang-rust-miri.yml} (60%)
 create mode 100644 lang/rust/examples/generate_interop_data.rs
 create mode 100644 lang/rust/examples/test_interop_data.rs

[avro] 26/30: AVRO-3302: Add support for recursive types using Fixed schema

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit f256aed6747b7feaa7fa32d5fd722d18cf64a7f8
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Jan 20 09:46:03 2022 +0200

    AVRO-3302: Add support for recursive types using Fixed schema
    
    Similar to
    https://github.com/apache/avro/commit/064cc6b4bd1c6dcf59da989e712a27f0a955f70d
    for Enum
    Reported at https://github.com/flavray/avro-rs/pull/99#issuecomment-1016948451
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 29197da2a94c889faa9101c92b7c60fed6a6db0b)
---
 lang/rust/src/schema.rs | 65 +++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 58 insertions(+), 7 deletions(-)

diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 5415d35..a60e047 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -789,7 +789,7 @@ impl Parser {
                 "enum" => self.parse_enum(complex),
                 "array" => self.parse_array(complex),
                 "map" => self.parse_map(complex),
-                "fixed" => Self::parse_fixed(complex),
+                "fixed" => self.parse_fixed(complex),
                 other => self.parse_known_schema(other),
             },
             Some(&Value::Object(ref data)) => self.parse_complex(data),
@@ -914,7 +914,7 @@ impl Parser {
 
     /// Parse a `serde_json::Value` representing a Avro fixed type into a
     /// `Schema`.
-    fn parse_fixed(complex: &Map<String, Value>) -> AvroResult<Schema> {
+    fn parse_fixed(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
 
         let doc = complex.get("doc").and_then(|v| match &v {
@@ -927,11 +927,14 @@ impl Parser {
             .and_then(|v| v.as_i64())
             .ok_or(Error::GetFixedSizeField)?;
 
-        Ok(Schema::Fixed {
-            name,
+        let schema = Schema::Fixed {
+            name: name.clone(),
             doc,
             size: size as usize,
-        })
+        };
+        self.parsed_schemas
+            .insert(name.fullname(None), schema.clone());
+        Ok(schema)
     }
 }
 
@@ -1560,8 +1563,8 @@ mod tests {
               ]
             }
           ]
-}
-"#;
+        }
+        "#;
 
         let schema = Schema::parse_str(schema).unwrap();
         let schema_str = schema.canonical_form();
@@ -1570,6 +1573,54 @@ mod tests {
     }
 
     #[test]
+    fn test_parsing_of_recursive_type_fixed() {
+        let schema = r#"
+    {
+        "type": "record",
+        "name": "User",
+        "namespace": "office",
+        "fields": [
+            {
+              "name": "details",
+              "type": [
+                {
+                  "type": "record",
+                  "name": "Employee",
+                  "fields": [
+                    {
+                      "name": "id",
+                      "type": {
+                        "type": "fixed",
+                        "name": "EmployeeId",
+                        "size": 16
+                      },
+                      "default": "female"
+                    }
+                  ]
+                },
+                {
+                  "type": "record",
+                  "name": "Manager",
+                  "fields": [
+                    {
+                      "name": "id",
+                      "type": "EmployeeId"
+                    }
+                  ]
+                }
+              ]
+            }
+          ]
+        }
+        "#;
+
+        let schema = Schema::parse_str(schema).unwrap();
+        let schema_str = schema.canonical_form();
+        let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"id","type":{"name":"EmployeeId","type":"fixed","size":16}}]},{"name":"Manager","type":"record","fields":[{"name":"id","type":{"name":"EmployeeId","type":"fixed","size":16}}]}]}]}"#;
+        assert_eq!(schema_str, expected);
+    }
+
+    #[test]
     fn test_enum_schema() {
         let schema = Schema::parse_str(
             r#"{"type": "enum", "name": "Suit", "symbols": ["diamonds", "spades", "clubs", "hearts"]}"#,

[avro] 28/30: AVRO-3315: Rust: Add support to back/cycle reference an alias (#1466)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 34ba17f1c486f1fd7e801e6a204ad62b78d80ee3
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Fri Jan 21 08:37:30 2022 +0200

    AVRO-3315: Rust: Add support to back/cycle reference an alias (#1466)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit a35a2a5305560da8ccb8eb299498e73c8bafa3bd)
---
 lang/rust/src/schema.rs | 125 ++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 115 insertions(+), 10 deletions(-)

diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index a60e047..ab5fd04 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -799,6 +799,45 @@ impl Parser {
         }
     }
 
+    fn register_resolving_schema(&mut self, name: &Name) {
+        let resolving_schema = Schema::Ref { name: name.clone() };
+        self.resolving_schemas
+            .insert(name.fullname(None), resolving_schema.clone());
+
+        let namespace = &name.namespace;
+
+        if let Some(ref aliases) = name.aliases {
+            aliases.iter().for_each(|alias| {
+                let alias_fullname = match namespace {
+                    Some(ref ns) => format!("{}.{}", ns, alias),
+                    None => alias.clone(),
+                };
+                self.resolving_schemas
+                    .insert(alias_fullname, resolving_schema.clone());
+            });
+        }
+    }
+
+    fn register_parsed_schema(&mut self, name: &Name, schema: &Schema) {
+        self.parsed_schemas
+            .insert(name.fullname(None), schema.clone());
+        self.resolving_schemas.remove(name.fullname(None).as_str());
+
+        let namespace = &name.namespace;
+
+        if let Some(ref aliases) = name.aliases {
+            aliases.iter().for_each(|alias| {
+                let alias_fullname = match namespace {
+                    Some(ref ns) => format!("{}.{}", ns, alias),
+                    None => alias.clone(),
+                };
+                self.parsed_schemas
+                    .insert(alias_fullname.clone(), schema.clone());
+                self.resolving_schemas.remove(alias_fullname.as_str());
+            });
+        }
+    }
+
     /// Parse a `serde_json::Value` representing a Avro record type into a
     /// `Schema`.
     fn parse_record(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
@@ -806,9 +845,7 @@ impl Parser {
 
         let mut lookup = HashMap::new();
 
-        let resolving_schema = Schema::Ref { name: name.clone() };
-        self.resolving_schemas
-            .insert(name.name.clone(), resolving_schema);
+        self.register_resolving_schema(&name);
 
         let fields: Vec<RecordField> = complex
             .get("fields")
@@ -834,9 +871,7 @@ impl Parser {
             lookup,
         };
 
-        self.parsed_schemas
-            .insert(name.fullname(None), schema.clone());
-        self.resolving_schemas.remove(name.name.as_str());
+        self.register_parsed_schema(&name, &schema);
         Ok(schema)
     }
 
@@ -877,8 +912,9 @@ impl Parser {
             doc: complex.doc(),
             symbols,
         };
-        self.parsed_schemas
-            .insert(name.fullname(None), schema.clone());
+
+        self.register_parsed_schema(&name, &schema);
+
         Ok(schema)
     }
 
@@ -932,8 +968,9 @@ impl Parser {
             doc,
             size: size as usize,
         };
-        self.parsed_schemas
-            .insert(name.fullname(None), schema.clone());
+
+        self.register_parsed_schema(&name, &schema);
+
         Ok(schema)
     }
 }
@@ -1620,6 +1657,74 @@ mod tests {
         assert_eq!(schema_str, expected);
     }
 
+    // AVRO-3302
+    #[test]
+    fn test_record_schema_with_currently_parsing_schema_aliases() {
+        let schema = Schema::parse_str(
+            r#"
+            {
+              "type": "record",
+              "name": "LongList",
+              "aliases": ["LinkedLongs"],
+              "fields" : [
+                {"name": "value", "type": "long"},
+                {"name": "next", "type": ["null", "LinkedLongs"]}
+              ]
+            }
+        "#,
+        )
+        .unwrap();
+
+        let mut lookup = HashMap::new();
+        lookup.insert("value".to_owned(), 0);
+        lookup.insert("next".to_owned(), 1);
+
+        let expected = Schema::Record {
+            name: Name {
+                name: "LongList".to_owned(),
+                namespace: None,
+                aliases: Some(vec!["LinkedLongs".to_owned()]),
+            },
+            doc: None,
+            fields: vec![
+                RecordField {
+                    name: "value".to_string(),
+                    doc: None,
+                    default: None,
+                    schema: Schema::Long,
+                    order: RecordFieldOrder::Ascending,
+                    position: 0,
+                },
+                RecordField {
+                    name: "next".to_string(),
+                    doc: None,
+                    default: None,
+                    schema: Schema::Union(
+                        UnionSchema::new(vec![
+                            Schema::Null,
+                            Schema::Ref {
+                                name: Name {
+                                    name: "LongList".to_owned(),
+                                    namespace: None,
+                                    aliases: Some(vec!["LinkedLongs".to_owned()]),
+                                },
+                            },
+                        ])
+                        .unwrap(),
+                    ),
+                    order: RecordFieldOrder::Ascending,
+                    position: 1,
+                },
+            ],
+            lookup,
+        };
+        assert_eq!(schema, expected);
+
+        let canonical_form = &schema.canonical_form();
+        let expected = r#"{"name":"LongList","type":"record","fields":[{"name":"value","type":"long"},{"name":"next","type":["null","LongList"]}]}"#;
+        assert_eq!(canonical_form, &expected);
+    }
+
     #[test]
     fn test_enum_schema() {
         let schema = Schema::parse_str(

[avro] 15/30: AVRO-3232: add matches to deserialize_any union+string/map (#1368)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 522c60409aef60778ab43cb704de1636de137bee
Author: Ultrabug <ul...@users.noreply.github.com>
AuthorDate: Fri Jan 7 14:51:55 2022 +0100

    AVRO-3232: add matches to deserialize_any union+string/map (#1368)
    
    the deserialize_any method is used when using from_value::<T>
    to deserialize a record which has fields that are not present
    in <T>
    
    the deserialize_any was missing some matches on certain
    Value variants which caused errors like:
    
    Error: DeserializeValue("Unsupported union")
    Error: DeserializeValue("incorrect value of type: String")
    
    this patch adds the missing matches and enhances the error
    message to help in understanding which type could be missing in
    the future
    
    the test case added showcases the problem while validating that
    union deserialization is actually working as well
    
    Signed-off-by: Ultrabug <ul...@ultrabug.net>
    (cherry picked from commit e8200cd2d40d9cf4a702905e6d2eeb9d589ec57a)
---
 lang/rust/src/de.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 79 insertions(+), 2 deletions(-)

diff --git a/lang/rust/src/de.rs b/lang/rust/src/de.rs
index cd275bc..4593a5d 100644
--- a/lang/rust/src/de.rs
+++ b/lang/rust/src/de.rs
@@ -254,13 +254,22 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
                 Value::Null => visitor.visit_unit(),
                 Value::Boolean(b) => visitor.visit_bool(b),
                 Value::Int(i) => visitor.visit_i32(i),
-                Value::Long(i) => visitor.visit_i64(i),
+                Value::Long(i)
+                | Value::TimeMicros(i)
+                | Value::TimestampMillis(i)
+                | Value::TimestampMicros(i) => visitor.visit_i64(i),
                 Value::Float(f) => visitor.visit_f32(f),
                 Value::Double(d) => visitor.visit_f64(d),
-                _ => Err(de::Error::custom("Unsupported union")),
+                Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
+                Value::Array(ref fields) => visitor.visit_seq(SeqDeserializer::new(fields)),
+                Value::String(ref s) => visitor.visit_str(s),
+                Value::Map(ref items) => visitor.visit_map(MapDeserializer::new(items)),
+                _ => Err(de::Error::custom(format!("unsupported union: {:?}", self.input))),
             },
             Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
             Value::Array(ref fields) => visitor.visit_seq(SeqDeserializer::new(fields)),
+            Value::String(ref s) => visitor.visit_str(s),
+            Value::Map(ref items) => visitor.visit_map(MapDeserializer::new(items)),
             value => Err(de::Error::custom(format!(
                 "incorrect value of type: {:?}",
                 crate::schema::SchemaKind::from(value)
@@ -896,4 +905,72 @@ mod tests {
         assert_eq!(result.as_bytes(), raw_value);
         Ok(())
     }
+
+    #[test]
+    fn test_from_value_with_union() -> TestResult<()> {
+        // AVRO-3232 test for deserialize_any on missing fields on the destination struct:
+        // Error: DeserializeValue("Unsupported union")
+        // Error: DeserializeValue("incorrect value of type: String")
+        #[derive(Debug, Deserialize, PartialEq)]
+        struct RecordInUnion {
+            record_in_union: i32,
+        }
+
+        #[derive(Debug, Deserialize, PartialEq)]
+        struct StructWithMissingFields {
+            a_string: String,
+            a_record: Option<RecordInUnion>,
+            an_array: Option<[bool; 2]>,
+            a_union_map: Option<HashMap<String, i64>>,
+        }
+
+        let raw_map: HashMap<String, i64> = [
+            ("long_one".to_string(), 1),
+            ("long_two".to_string(), 2),
+            ("long_three".to_string(), 3),
+            ("time_micros_a".to_string(), 123),
+            ("timestamp_millis_b".to_string(), 234),
+            ("timestamp_micros_c".to_string(), 345),
+        ].iter().cloned().collect();
+
+        let value_map = raw_map.iter()
+            .map(|(k, v)| match k {
+                key  if key.starts_with("long_") => {(k.clone(), Value::Long(*v))}
+                key  if key.starts_with("time_micros_") => {(k.clone(), Value::TimeMicros(*v))}
+                key  if key.starts_with("timestamp_millis_") => {(k.clone(), Value::TimestampMillis(*v))}
+                key  if key.starts_with("timestamp_micros_") => {(k.clone(), Value::TimestampMicros(*v))}
+                _ => unreachable!(""),
+            })
+            .collect();
+
+        let record = Value::Record(vec![
+            ("a_string".to_string(), Value::String("a valid message field".to_string())),
+            ("a_non_existing_string".to_string(), Value::String("a string".to_string())),
+            ("a_union_string".to_string(), Value::Union(Box::new(Value::String("a union string".to_string())))),
+            ("a_union_long".to_string(), Value::Union(Box::new(Value::Long(412)))),
+            ("a_union_long".to_string(), Value::Union(Box::new(Value::Long(412)))),
+            ("a_time_micros".to_string(), Value::Union(Box::new(Value::TimeMicros(123)))),
+            ("a_non_existing_time_micros".to_string(), Value::Union(Box::new(Value::TimeMicros(-123)))),
+            ("a_timestamp_millis".to_string(), Value::Union(Box::new(Value::TimestampMillis(234)))),
+            ("a_non_existing_timestamp_millis".to_string(), Value::Union(Box::new(Value::TimestampMillis(-234)))),
+            ("a_timestamp_micros".to_string(), Value::Union(Box::new(Value::TimestampMicros(345)))),
+            ("a_non_existing_timestamp_micros".to_string(), Value::Union(Box::new(Value::TimestampMicros(-345)))),
+            ("a_record".to_string(), Value::Union(Box::new(Value::Record(vec!(("record_in_union".to_string(), Value::Int(-2))))))),
+            ("a_non_existing_record".to_string(), Value::Union(Box::new(Value::Record(vec!(("blah".to_string(), Value::Int(-22))))))),
+            ("an_array".to_string(), Value::Union(Box::new(Value::Array(vec!(Value::Boolean(true), Value::Boolean(false)))))),
+            ("a_non_existing_array".to_string(), Value::Union(Box::new(Value::Array(vec!(Value::Boolean(false), Value::Boolean(true)))))),
+            ("a_union_map".to_string(), Value::Union(Box::new(Value::Map(value_map)))),
+            ("a_non_existing_union_map".to_string(), Value::Union(Box::new(Value::Map(HashMap::new())))),
+        ]);
+
+        let deserialized: StructWithMissingFields = crate::from_value(&record)?;
+        let reference = StructWithMissingFields{
+            a_string: "a valid message field".to_string(),
+            a_record: Some(RecordInUnion { record_in_union: -2 }),
+            an_array: Some([true, false]),
+            a_union_map: Some(raw_map)
+        };
+        assert_eq!(deserialized, reference);
+        Ok(())
+    }
 }

[avro] 14/30: AVRO-3240: Move the test data in the test method

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit bad06d0b14de58a7419688493254c16477677373
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Fri Jan 7 15:50:14 2022 +0200

    AVRO-3240: Move the test data in the test method
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 36f34be4ae5a4c8b7a79867d6e082c5f608d5760)
---
 lang/rust/src/reader.rs | 57 +++++++++++++++++++++++++------------------------
 1 file changed, 29 insertions(+), 28 deletions(-)

diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index ce6a629..5757044 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -343,7 +343,26 @@ mod tests {
         6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
         207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
     ];
-    const TEST_RECORD_SCHEMA_3240: &str = r#"
+
+    #[test]
+    fn test_from_avro_datum() {
+        let schema = Schema::parse_str(SCHEMA).unwrap();
+        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
+
+        let mut record = Record::new(&schema).unwrap();
+        record.put("a", 27i64);
+        record.put("b", "foo");
+        let expected = record.into();
+
+        assert_eq!(
+            from_avro_datum(&schema, &mut encoded, None).unwrap(),
+            expected
+        );
+    }
+
+    #[test]
+    fn test_from_avro_datum_with_union_to_struct() {
+        const TEST_RECORD_SCHEMA_3240: &str = r#"
     {
       "type": "record",
       "name": "test",
@@ -375,34 +394,16 @@ mod tests {
       ]
     }
     "#;
-    #[derive(Default, Debug, Deserialize, PartialEq)]
-    struct TestRecord3240 {
-        a: i64,
-        b: String,
-        a_nullable_array: Option<Vec<String>>,
-        // we are missing the 'a_nullable_boolean' field to simulate missing keys
-        // a_nullable_boolean: Option<bool>,
-        a_nullable_string: Option<String>,
-    }
-
-    #[test]
-    fn test_from_avro_datum() {
-        let schema = Schema::parse_str(SCHEMA).unwrap();
-        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
-
-        let mut record = Record::new(&schema).unwrap();
-        record.put("a", 27i64);
-        record.put("b", "foo");
-        let expected = record.into();
-
-        assert_eq!(
-            from_avro_datum(&schema, &mut encoded, None).unwrap(),
-            expected
-        );
-    }
+        #[derive(Default, Debug, Deserialize, PartialEq)]
+        struct TestRecord3240 {
+            a: i64,
+            b: String,
+            a_nullable_array: Option<Vec<String>>,
+            // we are missing the 'a_nullable_boolean' field to simulate missing keys
+            // a_nullable_boolean: Option<bool>,
+            a_nullable_string: Option<String>,
+        }
 
-    #[test]
-    fn test_from_avro_datum_with_union_to_struct() {
         let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
         let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
 

[avro] 21/30: AVRO-3302: Add interop tests for rust (#1456)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e8ce233387929144ae2a8ca78865956b443624b1
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 19 09:44:47 2022 +0200

    AVRO-3302: Add interop tests for rust (#1456)
    
    * AVRO-3302: Add interop tests for Rust module
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Implement test_interop_data for Rust
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Run JS interop tests to read .avro files created by Rust
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Fix the path to lang/js
    
    Add debug for failing Maven build
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: git checkout & install Rust stable
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Fix formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove debug statement
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Revert removed debug statement
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302: Improve documentation
    
    First check 'parsed_schemas', then 'resolving_schemas' and finally
    'input_schemas'.
    Enable a test case that is working now.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Seems to work ?!
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Code formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Properly encode/decode Schema::Ref
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove debug statements
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Print messages for successful reads
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Rename Codec::Zstd to Codec::Zstandard for consistency
    
    This is the name used by the other Avro modules (e.g. Java & Perl)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Fix formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Collect all errors during interop test and panic at the end
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Format imports for +nightly
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Use Perl to read the interop .avro files created by Rust
    
    Perl support most of the optional codecs
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Code cleanup
    
    Revert changes which are not really needed.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3302 Remove a FIXME that cannot be addressed
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 98f4f4a87bb61cacf91f50253f8154c8b36de690)
---
 .github/workflows/test-lang-rust-ci.yml     |  71 ++++++
 lang/rust/build.sh                          |  35 ++-
 lang/rust/examples/generate_interop_data.rs |  99 ++++++++
 lang/rust/examples/test_interop_data.rs     |  59 +++++
 lang/rust/src/codec.rs                      |  16 +-
 lang/rust/src/decode.rs                     | 338 +++++++++++++++-------------
 lang/rust/src/encode.rs                     | 199 +++++++++-------
 lang/rust/src/error.rs                      |   4 +
 lang/rust/src/schema.rs                     | 164 +++++++++++++-
 lang/rust/src/schema_compatibility.rs       |   1 -
 lang/rust/src/types.rs                      | 116 ++++++----
 lang/rust/src/util.rs                       |  43 +++-
 lang/rust/tests/schema.rs                   |  86 ++++++-
 13 files changed, 916 insertions(+), 315 deletions(-)

diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml
index dacf461..91364f9 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -77,3 +77,74 @@ jobs:
         with:
           command: test
           args: --manifest-path lang/rust/Cargo.toml --doc
+
+  interop:
+    runs-on: ubuntu-latest
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v2
+
+      - name: Rust Toolchain
+        uses: actions-rs/toolchain@v1
+        with:
+          profile: minimal
+          toolchain: stable
+          override: true
+
+      - name: Cache Local Maven Repository
+        uses: actions/cache@v2
+        with:
+          path: ~/.m2/repository
+          key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+          restore-keys: |
+            ${{ runner.os }}-maven-
+
+      - name: Install Java Avro for Interop Test
+        working-directory: .
+        run: mvn -B install -DskipTests
+
+      - name: Create Interop Data Directory
+        working-directory: .
+        run: mkdir -p build/interop/data
+
+      - name: Generate Interop Resources
+        working-directory: lang/java/avro
+        run: mvn -B -P interop-data-generate generate-resources
+
+      - name: Generate interop data
+        run: ./build.sh interop-data-generate
+
+      - name: Rust reads interop files created by Java and Rust
+        run: ./build.sh interop-data-test
+
+      - uses: shogo82148/actions-setup-perl@v1
+        with:
+          perl-version: 5.32
+
+      - name: Install Dependencies
+        run: |
+          sudo apt-get -qqy install --no-install-recommends libcompress-raw-zlib-perl \
+                                                            libcpan-uploader-perl \
+                                                            libencode-perl \
+                                                            libio-string-perl \
+                                                            libjansson-dev \
+                                                            libjson-xs-perl \
+                                                            libmodule-install-perl \
+                                                            libmodule-install-readmefrompod-perl \
+                                                            libobject-tiny-perl \
+                                                            libsnappy-dev \
+                                                            libtest-exception-perl \
+                                                            libtest-pod-perl
+          cpanm --mirror https://www.cpan.org/ install Compress::Zstd \
+                                                       Error::Simple \
+                                                       Module::Install::Repository \
+                                                       Object::Tiny \
+                                                       Regexp::Common \
+                                                       Try::Tiny \
+                                                       inc::Module::Install
+
+
+      - name: Perl reads interop files created by Java and Rust
+        working-directory: lang/perl
+        run: ./build.sh interop-data-test
diff --git a/lang/rust/build.sh b/lang/rust/build.sh
index d9a2484..2f0a824 100755
--- a/lang/rust/build.sh
+++ b/lang/rust/build.sh
@@ -15,7 +15,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-set -e
+set -e  # exit on error
+
+root_dir=$(pwd)
+build_dir="../../build/rust"
+dist_dir="../../dist/rust"
+
+
+function clean {
+  if [ -d $build_dir ]; then
+    find $build_dir | xargs chmod 755
+    rm -rf $build_dir
+  fi
+}
+
+
+function prepare_build {
+  clean
+  mkdir -p $build_dir
+}
 
 cd `dirname "$0"`
 
@@ -35,10 +53,21 @@ do
       cargo build --release --lib --all-features
       cargo package
       mkdir -p  ../../dist/rust
-      cp target/package/avro-rs-*.crate ../../dist/rust
+      cp target/package/avro-rs-*.crate $dist_dir
+      ;;
+    interop-data-generate)
+      prepare_build
+      export RUST_LOG=avro_rs=debug
+      export RUST_BACKTRACE=1
+      cargo run --all-features --example generate_interop_data
+      ;;
+
+    interop-data-test)
+      prepare_build
+      cargo run --all-features --example test_interop_data
       ;;
     *)
-      echo "Usage: $0 {lint|test|dist|clean}" >&2
+      echo "Usage: $0 {lint|test|dist|clean|interop-data-generate|interop-data-test}" >&2
       exit 1
   esac
 done
diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
new file mode 100644
index 0000000..cb8efda
--- /dev/null
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::{
+    schema::Schema,
+    types::{Record, Value},
+    Codec, Writer,
+};
+use std::collections::HashMap;
+use strum::IntoEnumIterator;
+
+fn create_datum(schema: &Schema) -> Record {
+    let mut datum = Record::new(schema).unwrap();
+    datum.put("intField", 12_i32);
+    datum.put("longField", 15234324_i64);
+    datum.put("stringField", "hey");
+    datum.put("boolField", true);
+    datum.put("floatField", 1234.0_f32);
+    datum.put("doubleField", -1234.0_f64);
+    datum.put("bytesField", b"12312adf".to_vec());
+    datum.put("nullField", Value::Null);
+    datum.put(
+        "arrayField",
+        Value::Array(vec![
+            Value::Double(5.0),
+            Value::Double(0.0),
+            Value::Double(12.0),
+        ]),
+    );
+    let mut map = HashMap::new();
+    map.insert(
+        "a".into(),
+        Value::Record(vec![("label".into(), Value::String("a".into()))]),
+    );
+    map.insert(
+        "bee".into(),
+        Value::Record(vec![("label".into(), Value::String("cee".into()))]),
+    );
+    datum.put("mapField", Value::Map(map));
+    datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+    datum.put("enumField", Value::Enum(2, "C".to_owned()));
+    datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
+    datum.put(
+        "recordField",
+        Value::Record(vec![
+            ("label".into(), Value::String("outer".into())),
+            (
+                "children".into(),
+                Value::Array(vec![Value::Record(vec![
+                    ("label".into(), Value::String("inner".into())),
+                    ("children".into(), Value::Array(vec![])),
+                ])]),
+            ),
+        ]),
+    );
+
+    datum
+}
+
+fn main() -> anyhow::Result<()> {
+    let schema_str = std::fs::read_to_string("../../share/test/schemas/interop.avsc")
+        .expect("Unable to read the interop Avro schema");
+    let schema = Schema::parse_str(schema_str.as_str())?;
+
+    for codec in Codec::iter() {
+        let mut writer = Writer::with_codec(&schema, Vec::new(), codec);
+        let datum = create_datum(&schema);
+        writer.append(datum)?;
+        let bytes = writer.into_inner()?;
+
+        let codec_name = <&str>::from(codec);
+        let suffix = if codec_name == "null" {
+            "".to_owned()
+        } else {
+            format!("_{}", codec_name)
+        };
+
+        std::fs::write(
+            format!("../../build/interop/data/rust{}.avro", suffix),
+            bytes,
+        )?;
+    }
+
+    Ok(())
+}
diff --git a/lang/rust/examples/test_interop_data.rs b/lang/rust/examples/test_interop_data.rs
new file mode 100644
index 0000000..f86c6c4
--- /dev/null
+++ b/lang/rust/examples/test_interop_data.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use avro_rs::Reader;
+use std::ffi::OsStr;
+
+fn main() -> anyhow::Result<()> {
+    let data_dir = std::fs::read_dir("../../build/interop/data/")
+        .expect("Unable to list the interop data directory");
+
+    let mut errors = Vec::new();
+
+    for entry in data_dir {
+        let path = entry
+            .expect("Unable to read the interop data directory's files")
+            .path();
+
+        if path.is_file() {
+            let ext = path.extension().and_then(OsStr::to_str).unwrap();
+
+            if ext == "avro" {
+                println!("Checking {:?}", &path);
+                let content = std::fs::File::open(&path)?;
+                let reader = Reader::new(&content)?;
+                for value in reader {
+                    if let Err(e) = value {
+                        errors.push(format!(
+                            "There is a problem with reading of '{:?}', \n {:?}\n",
+                            &path, e
+                        ));
+                    }
+                }
+            }
+        }
+    }
+
+    if errors.is_empty() {
+        Ok(())
+    } else {
+        panic!(
+            "There were errors reading some .avro files:\n{}",
+            errors.join(", ")
+        );
+    }
+}
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 0ba8abe..c9a584e 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -19,7 +19,7 @@
 use crate::{types::Value, AvroResult, Error};
 use libflate::deflate::{Decoder, Encoder};
 use std::io::{Read, Write};
-use strum_macros::{EnumString, IntoStaticStr};
+use strum_macros::{EnumIter, EnumString, IntoStaticStr};
 
 #[cfg(feature = "bzip")]
 use bzip2::{
@@ -34,7 +34,7 @@ use crc32fast::Hasher;
 use xz2::read::{XzDecoder, XzEncoder};
 
 /// The compression codec used to compress blocks.
-#[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
+#[derive(Clone, Copy, Debug, PartialEq, EnumIter, EnumString, IntoStaticStr)]
 #[strum(serialize_all = "kebab_case")]
 pub enum Codec {
     /// The `Null` codec simply passes through data uncompressed.
@@ -49,7 +49,7 @@ pub enum Codec {
     /// CRC32 checksum of the uncompressed data in the block.
     Snappy,
     #[cfg(feature = "zstandard")]
-    Zstd,
+    Zstandard,
     #[cfg(feature = "bzip")]
     /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
     /// compression library.
@@ -98,7 +98,7 @@ impl Codec {
                 *stream = encoded;
             }
             #[cfg(feature = "zstandard")]
-            Codec::Zstd => {
+            Codec::Zstandard => {
                 let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
                 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
                 *stream = encoder.finish().unwrap();
@@ -157,7 +157,7 @@ impl Codec {
                 decoded
             }
             #[cfg(feature = "zstandard")]
-            Codec::Zstd => {
+            Codec::Zstandard => {
                 let mut decoded = Vec::new();
                 let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
                 std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
@@ -212,7 +212,7 @@ mod tests {
     #[cfg(feature = "zstandard")]
     #[test]
     fn zstd_compress_and_decompress() {
-        compress_and_decompress(Codec::Zstd);
+        compress_and_decompress(Codec::Zstandard);
     }
 
     #[cfg(feature = "bzip")]
@@ -245,7 +245,7 @@ mod tests {
         assert_eq!(<&str>::from(Codec::Snappy), "snappy");
 
         #[cfg(feature = "zstandard")]
-        assert_eq!(<&str>::from(Codec::Zstd), "zstd");
+        assert_eq!(<&str>::from(Codec::Zstandard), "zstandard");
 
         #[cfg(feature = "bzip")]
         assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
@@ -265,7 +265,7 @@ mod tests {
         assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
 
         #[cfg(feature = "zstandard")]
-        assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
+        assert_eq!(Codec::from_str("zstandard").unwrap(), Codec::Zstandard);
 
         #[cfg(feature = "bzip")]
         assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 5639d28..26678bf 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -68,183 +68,217 @@ fn decode_seq_len<R: Read>(reader: &mut R) -> AvroResult<usize> {
 
 /// Decode a `Value` from avro format given its `Schema`.
 pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
-    match *schema {
-        Schema::Null => Ok(Value::Null),
-        Schema::Boolean => {
-            let mut buf = [0u8; 1];
-            match reader.read_exact(&mut buf[..]) {
-                Ok(_) => match buf[0] {
-                    0u8 => Ok(Value::Boolean(false)),
-                    1u8 => Ok(Value::Boolean(true)),
-                    _ => Err(Error::BoolValue(buf[0])),
-                },
-                Err(io_err) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Null)
-                    } else {
-                        Err(Error::ReadBoolean(io_err))
+    fn decode0<R: Read>(
+        schema: &Schema,
+        reader: &mut R,
+        schemas_by_name: &mut HashMap<String, Schema>,
+    ) -> AvroResult<Value> {
+        match *schema {
+            Schema::Null => Ok(Value::Null),
+            Schema::Boolean => {
+                let mut buf = [0u8; 1];
+                match reader.read_exact(&mut buf[..]) {
+                    Ok(_) => match buf[0] {
+                        0u8 => Ok(Value::Boolean(false)),
+                        1u8 => Ok(Value::Boolean(true)),
+                        _ => Err(Error::BoolValue(buf[0])),
+                    },
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadBoolean(io_err))
+                        }
                     }
                 }
             }
-        }
-        Schema::Decimal { ref inner, .. } => match &**inner {
-            Schema::Fixed { .. } => match decode(inner, reader)? {
-                Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
-                value => Err(Error::FixedValue(value.into())),
-            },
-            Schema::Bytes => match decode(inner, reader)? {
-                Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
-                value => Err(Error::BytesValue(value.into())),
+            Schema::Decimal { ref inner, .. } => match &**inner {
+                Schema::Fixed { .. } => match decode0(inner, reader, schemas_by_name)? {
+                    Value::Fixed(_, bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+                    value => Err(Error::FixedValue(value.into())),
+                },
+                Schema::Bytes => match decode0(inner, reader, schemas_by_name)? {
+                    Value::Bytes(bytes) => Ok(Value::Decimal(Decimal::from(bytes))),
+                    value => Err(Error::BytesValue(value.into())),
+                },
+                schema => Err(Error::ResolveDecimalSchema(schema.into())),
             },
-            schema => Err(Error::ResolveDecimalSchema(schema.into())),
-        },
-        Schema::Uuid => Ok(Value::Uuid(
-            Uuid::from_str(match decode(&Schema::String, reader)? {
-                Value::String(ref s) => s,
-                value => return Err(Error::GetUuidFromStringValue(value.into())),
-            })
-            .map_err(Error::ConvertStrToUuid)?,
-        )),
-        Schema::Int => decode_int(reader),
-        Schema::Date => zag_i32(reader).map(Value::Date),
-        Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
-        Schema::Long => decode_long(reader),
-        Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
-        Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
-        Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
-        Schema::Duration => {
-            let mut buf = [0u8; 12];
-            reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
-            Ok(Value::Duration(Duration::from(buf)))
-        }
-        Schema::Float => {
-            let mut buf = [0u8; std::mem::size_of::<f32>()];
-            reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
-            Ok(Value::Float(f32::from_le_bytes(buf)))
-        }
-        Schema::Double => {
-            let mut buf = [0u8; std::mem::size_of::<f64>()];
-            reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
-            Ok(Value::Double(f64::from_le_bytes(buf)))
-        }
-        Schema::Bytes => {
-            let len = decode_len(reader)?;
-            let mut buf = vec![0u8; len];
-            reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
-            Ok(Value::Bytes(buf))
-        }
-        Schema::String => {
-            let len = decode_len(reader)?;
-            let mut buf = vec![0u8; len];
-            match reader.read_exact(&mut buf) {
-                Ok(_) => Ok(Value::String(
-                    String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-                )),
-                Err(io_err) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Null)
-                    } else {
-                        Err(Error::ReadString(io_err))
+            Schema::Uuid => Ok(Value::Uuid(
+                Uuid::from_str(match decode0(&Schema::String, reader, schemas_by_name)? {
+                    Value::String(ref s) => s,
+                    value => return Err(Error::GetUuidFromStringValue(value.into())),
+                })
+                .map_err(Error::ConvertStrToUuid)?,
+            )),
+            Schema::Int => decode_int(reader),
+            Schema::Date => zag_i32(reader).map(Value::Date),
+            Schema::TimeMillis => zag_i32(reader).map(Value::TimeMillis),
+            Schema::Long => decode_long(reader),
+            Schema::TimeMicros => zag_i64(reader).map(Value::TimeMicros),
+            Schema::TimestampMillis => zag_i64(reader).map(Value::TimestampMillis),
+            Schema::TimestampMicros => zag_i64(reader).map(Value::TimestampMicros),
+            Schema::Duration => {
+                let mut buf = [0u8; 12];
+                reader.read_exact(&mut buf).map_err(Error::ReadDuration)?;
+                Ok(Value::Duration(Duration::from(buf)))
+            }
+            Schema::Float => {
+                let mut buf = [0u8; std::mem::size_of::<f32>()];
+                reader.read_exact(&mut buf[..]).map_err(Error::ReadFloat)?;
+                Ok(Value::Float(f32::from_le_bytes(buf)))
+            }
+            Schema::Double => {
+                let mut buf = [0u8; std::mem::size_of::<f64>()];
+                reader.read_exact(&mut buf[..]).map_err(Error::ReadDouble)?;
+                Ok(Value::Double(f64::from_le_bytes(buf)))
+            }
+            Schema::Bytes => {
+                let len = decode_len(reader)?;
+                let mut buf = vec![0u8; len];
+                reader.read_exact(&mut buf).map_err(Error::ReadBytes)?;
+                Ok(Value::Bytes(buf))
+            }
+            Schema::String => {
+                let len = decode_len(reader)?;
+                let mut buf = vec![0u8; len];
+                match reader.read_exact(&mut buf) {
+                    Ok(_) => Ok(Value::String(
+                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                    )),
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadString(io_err))
+                        }
                     }
                 }
             }
-        }
-        Schema::Fixed { size, .. } => {
-            let mut buf = vec![0u8; size];
-            reader
-                .read_exact(&mut buf)
-                .map_err(|e| Error::ReadFixed(e, size))?;
-            Ok(Value::Fixed(size, buf))
-        }
-        Schema::Array(ref inner) => {
-            let mut items = Vec::new();
+            Schema::Fixed { ref name, size, .. } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                let mut buf = vec![0u8; size];
+                reader
+                    .read_exact(&mut buf)
+                    .map_err(|e| Error::ReadFixed(e, size))?;
+                Ok(Value::Fixed(size, buf))
+            }
+            Schema::Array(ref inner) => {
+                let mut items = Vec::new();
 
-            loop {
-                let len = decode_seq_len(reader)?;
-                if len == 0 {
-                    break;
-                }
+                loop {
+                    let len = decode_seq_len(reader)?;
+                    if len == 0 {
+                        break;
+                    }
 
-                items.reserve(len);
-                for _ in 0..len {
-                    items.push(decode(inner, reader)?);
+                    items.reserve(len);
+                    for _ in 0..len {
+                        items.push(decode0(inner, reader, schemas_by_name)?);
+                    }
                 }
-            }
 
-            Ok(Value::Array(items))
-        }
-        Schema::Map(ref inner) => {
-            let mut items = HashMap::new();
+                Ok(Value::Array(items))
+            }
+            Schema::Map(ref inner) => {
+                let mut items = HashMap::new();
 
-            loop {
-                let len = decode_seq_len(reader)?;
-                if len == 0 {
-                    break;
-                }
+                loop {
+                    let len = decode_seq_len(reader)?;
+                    if len == 0 {
+                        break;
+                    }
 
-                items.reserve(len);
-                for _ in 0..len {
-                    match decode(&Schema::String, reader)? {
-                        Value::String(key) => {
-                            let value = decode(inner, reader)?;
-                            items.insert(key, value);
+                    items.reserve(len);
+                    for _ in 0..len {
+                        match decode0(&Schema::String, reader, schemas_by_name)? {
+                            Value::String(key) => {
+                                let value = decode0(inner, reader, schemas_by_name)?;
+                                items.insert(key, value);
+                            }
+                            value => return Err(Error::MapKeyType(value.into())),
                         }
-                        value => return Err(Error::MapKeyType(value.into())),
                     }
                 }
-            }
 
-            Ok(Value::Map(items))
-        }
-        Schema::Union(ref inner) => match zag_i64(reader) {
-            Ok(index) => {
-                let variants = inner.variants();
-                let variant = variants
-                    .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
-                    .ok_or_else(|| Error::GetUnionVariant {
-                        index,
-                        num_variants: variants.len(),
-                    })?;
-                let value = decode(variant, reader)?;
-                Ok(Value::Union(Box::new(value)))
+                Ok(Value::Map(items))
             }
-            Err(Error::ReadVariableIntegerBytes(io_err)) => {
-                if let ErrorKind::UnexpectedEof = io_err.kind() {
-                    Ok(Value::Union(Box::new(Value::Null)))
-                } else {
-                    Err(Error::ReadVariableIntegerBytes(io_err))
+            Schema::Union(ref inner) => match zag_i64(reader) {
+                Ok(index) => {
+                    let variants = inner.variants();
+                    let variant = variants
+                        .get(
+                            usize::try_from(index)
+                                .map_err(|e| Error::ConvertI64ToUsize(e, index))?,
+                        )
+                        .ok_or_else(|| Error::GetUnionVariant {
+                            index,
+                            num_variants: variants.len(),
+                        })?;
+                    let value = decode0(variant, reader, schemas_by_name)?;
+                    Ok(Value::Union(Box::new(value)))
                 }
-            }
-            Err(io_err) => Err(io_err),
-        },
+                Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Union(Box::new(Value::Null)))
+                    } else {
+                        Err(Error::ReadVariableIntegerBytes(io_err))
+                    }
+                }
+                Err(io_err) => Err(io_err),
+            },
 
-        Schema::Record { ref fields, .. } => {
-            // Benchmarks indicate ~10% improvement using this method.
-            let mut items = Vec::with_capacity(fields.len());
-            for field in fields {
-                // TODO: This clone is also expensive. See if we can do away with it...
-                items.push((field.name.clone(), decode(&field.schema, reader)?));
+            Schema::Record {
+                ref name,
+                ref fields,
+                ..
+            } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                // Benchmarks indicate ~10% improvement using this method.
+                let mut items = Vec::with_capacity(fields.len());
+                for field in fields {
+                    // TODO: This clone is also expensive. See if we can do away with it...
+                    items.push((
+                        field.name.clone(),
+                        decode0(&field.schema, reader, schemas_by_name)?,
+                    ));
+                }
+                Ok(Value::Record(items))
             }
-            Ok(Value::Record(items))
-        }
-        Schema::Enum { ref symbols, .. } => {
-            Ok(if let Value::Int(raw_index) = decode_int(reader)? {
-                let index = usize::try_from(raw_index)
-                    .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
-                if (0..=symbols.len()).contains(&index) {
-                    let symbol = symbols[index].clone();
-                    Value::Enum(raw_index, symbol)
+            Schema::Enum {
+                ref name,
+                ref symbols,
+                ..
+            } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+                Ok(if let Value::Int(raw_index) = decode_int(reader)? {
+                    let index = usize::try_from(raw_index)
+                        .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+                    if (0..=symbols.len()).contains(&index) {
+                        let symbol = symbols[index].clone();
+                        Value::Enum(raw_index, symbol)
+                    } else {
+                        return Err(Error::GetEnumValue {
+                            index,
+                            nsymbols: symbols.len(),
+                        });
+                    }
+                } else {
+                    return Err(Error::GetEnumSymbol);
+                })
+            }
+            Schema::Ref { ref name } => {
+                let name = &name.name;
+                if let Some(resolved) = schemas_by_name.get(name.as_str()) {
+                    decode0(resolved, reader, &mut schemas_by_name.clone())
                 } else {
-                    return Err(Error::GetEnumValue {
-                        index,
-                        nsymbols: symbols.len(),
-                    });
+                    Err(Error::SchemaResolutionError(name.clone()))
                 }
-            } else {
-                return Err(Error::GetEnumSymbol);
-            })
+            }
         }
     }
+
+    let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+    decode0(schema, reader, &mut schemas_by_name)
 }
 
 #[cfg(test)]
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 5c68051..088def9 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -20,7 +20,7 @@ use crate::{
     types::Value,
     util::{zig_i32, zig_i64},
 };
-use std::convert::TryInto;
+use std::{collections::HashMap, convert::TryInto};
 
 /// Encode a `Value` into avro format.
 ///
@@ -51,104 +51,135 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
 /// be valid with regards to the schema. Schema are needed only to guide the
 /// encoding for complex type values.
 pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
-    match value {
-        Value::Null => (),
-        Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
-        // Pattern | Pattern here to signify that these _must_ have the same encoding.
-        Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
-        Value::Long(i)
-        | Value::TimestampMillis(i)
-        | Value::TimestampMicros(i)
-        | Value::TimeMicros(i) => encode_long(*i, buffer),
-        Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-        Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
-        Value::Decimal(decimal) => match schema {
-            Schema::Decimal { inner, .. } => match *inner.clone() {
-                Schema::Fixed { size, .. } => {
-                    let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
-                    let num_bytes = bytes.len();
-                    if num_bytes != size {
-                        panic!(
-                            "signed decimal bytes length {} not equal to fixed schema size {}",
-                            num_bytes, size
-                        );
+    fn encode_ref0(
+        value: &Value,
+        schema: &Schema,
+        buffer: &mut Vec<u8>,
+        schemas_by_name: &mut HashMap<String, Schema>,
+    ) {
+        match &schema {
+            Schema::Ref { ref name } => {
+                let resolved = schemas_by_name.get(name.name.as_str()).unwrap();
+                return encode_ref0(value, resolved, buffer, &mut schemas_by_name.clone());
+            }
+            Schema::Record { ref name, .. }
+            | Schema::Enum { ref name, .. }
+            | Schema::Fixed { ref name, .. } => {
+                schemas_by_name.insert(name.name.clone(), schema.clone());
+            }
+            _ => (),
+        }
+
+        match value {
+            Value::Null => (),
+            Value::Boolean(b) => buffer.push(if *b { 1u8 } else { 0u8 }),
+            // Pattern | Pattern here to signify that these _must_ have the same encoding.
+            Value::Int(i) | Value::Date(i) | Value::TimeMillis(i) => encode_int(*i, buffer),
+            Value::Long(i)
+            | Value::TimestampMillis(i)
+            | Value::TimestampMicros(i)
+            | Value::TimeMicros(i) => encode_long(*i, buffer),
+            Value::Float(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+            Value::Double(x) => buffer.extend_from_slice(&x.to_le_bytes()),
+            Value::Decimal(decimal) => match schema {
+                Schema::Decimal { inner, .. } => match *inner.clone() {
+                    Schema::Fixed { size, .. } => {
+                        let bytes = decimal.to_sign_extended_bytes_with_len(size).unwrap();
+                        let num_bytes = bytes.len();
+                        if num_bytes != size {
+                            panic!(
+                                "signed decimal bytes length {} not equal to fixed schema size {}",
+                                num_bytes, size
+                            );
+                        }
+                        encode(&Value::Fixed(size, bytes), inner, buffer)
                     }
-                    encode(&Value::Fixed(size, bytes), inner, buffer)
-                }
-                Schema::Bytes => encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer),
-                _ => panic!("invalid inner type for decimal: {:?}", inner),
+                    Schema::Bytes => {
+                        encode(&Value::Bytes(decimal.try_into().unwrap()), inner, buffer)
+                    }
+                    _ => panic!("invalid inner type for decimal: {:?}", inner),
+                },
+                _ => panic!("invalid schema type for decimal: {:?}", schema),
             },
-            _ => panic!("invalid type for decimal: {:?}", schema),
-        },
-        &Value::Duration(duration) => {
-            let slice: [u8; 12] = duration.into();
-            buffer.extend_from_slice(&slice);
-        }
-        Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
-        Value::Bytes(bytes) => match *schema {
-            Schema::Bytes => encode_bytes(bytes, buffer),
-            Schema::Fixed { .. } => buffer.extend(bytes),
-            _ => (),
-        },
-        Value::String(s) => match *schema {
-            Schema::String => {
-                encode_bytes(s, buffer);
+            &Value::Duration(duration) => {
+                let slice: [u8; 12] = duration.into();
+                buffer.extend_from_slice(&slice);
             }
-            Schema::Enum { ref symbols, .. } => {
-                if let Some(index) = symbols.iter().position(|item| item == s) {
-                    encode_int(index as i32, buffer);
+            Value::Uuid(uuid) => encode_bytes(&uuid.to_string(), buffer),
+            Value::Bytes(bytes) => match *schema {
+                Schema::Bytes => encode_bytes(bytes, buffer),
+                Schema::Fixed { .. } => buffer.extend(bytes),
+                _ => error!("invalid schema type for bytes: {:?}", schema),
+            },
+            Value::String(s) => match *schema {
+                Schema::String => {
+                    encode_bytes(s, buffer);
+                }
+                Schema::Enum { ref symbols, .. } => {
+                    if let Some(index) = symbols.iter().position(|item| item == s) {
+                        encode_int(index as i32, buffer);
+                    }
+                }
+                _ => error!("invalid schema type for String: {:?}", schema),
+            },
+            Value::Fixed(_, bytes) => buffer.extend(bytes),
+            Value::Enum(i, _) => encode_int(*i, buffer),
+            Value::Union(item) => {
+                if let Schema::Union(ref inner) = *schema {
+                    // Find the schema that is matched here. Due to validation, this should always
+                    // return a value.
+                    let (idx, inner_schema) = inner
+                        .find_schema(item)
+                        .expect("Invalid Union validation occurred");
+                    encode_long(idx as i64, buffer);
+                    encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
+                } else {
+                    error!("invalid schema type for Union: {:?}", schema);
                 }
             }
-            _ => (),
-        },
-        Value::Fixed(_, bytes) => buffer.extend(bytes),
-        Value::Enum(i, _) => encode_int(*i, buffer),
-        Value::Union(item) => {
-            if let Schema::Union(ref inner) = *schema {
-                // Find the schema that is matched here. Due to validation, this should always
-                // return a value.
-                let (idx, inner_schema) = inner
-                    .find_schema(item)
-                    .expect("Invalid Union validation occurred");
-                encode_long(idx as i64, buffer);
-                encode_ref(&*item, inner_schema, buffer);
-            }
-        }
-        Value::Array(items) => {
-            if let Schema::Array(ref inner) = *schema {
-                if !items.is_empty() {
-                    encode_long(items.len() as i64, buffer);
-                    for item in items.iter() {
-                        encode_ref(item, inner, buffer);
+            Value::Array(items) => {
+                if let Schema::Array(ref inner) = *schema {
+                    if !items.is_empty() {
+                        encode_long(items.len() as i64, buffer);
+                        for item in items.iter() {
+                            encode_ref0(item, inner, buffer, schemas_by_name);
+                        }
                     }
+                    buffer.push(0u8);
+                } else {
+                    error!("invalid schema type for Array: {:?}", schema);
                 }
-                buffer.push(0u8);
             }
-        }
-        Value::Map(items) => {
-            if let Schema::Map(ref inner) = *schema {
-                if !items.is_empty() {
-                    encode_long(items.len() as i64, buffer);
-                    for (key, value) in items {
-                        encode_bytes(key, buffer);
-                        encode_ref(value, inner, buffer);
+            Value::Map(items) => {
+                if let Schema::Map(ref inner) = *schema {
+                    if !items.is_empty() {
+                        encode_long(items.len() as i64, buffer);
+                        for (key, value) in items {
+                            encode_bytes(key, buffer);
+                            encode_ref0(value, inner, buffer, schemas_by_name);
+                        }
                     }
+                    buffer.push(0u8);
+                } else {
+                    error!("invalid schema type for Map: {:?}", schema);
                 }
-                buffer.push(0u8);
             }
-        }
-        Value::Record(fields) => {
-            if let Schema::Record {
-                fields: ref schema_fields,
-                ..
-            } = *schema
-            {
-                for (i, &(_, ref value)) in fields.iter().enumerate() {
-                    encode_ref(value, &schema_fields[i].schema, buffer);
+            Value::Record(fields) => {
+                if let Schema::Record {
+                    fields: ref schema_fields,
+                    ..
+                } = *schema
+                {
+                    for (i, &(_, ref value)) in fields.iter().enumerate() {
+                        encode_ref0(value, &schema_fields[i].schema, buffer, schemas_by_name);
+                    }
                 }
             }
         }
     }
+
+    let mut schemas_by_name = HashMap::new();
+    encode_ref0(value, schema, buffer, &mut schemas_by_name)
 }
 
 pub fn encode_to_vec(value: &Value, schema: &Schema) -> Vec<u8> {
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index f4b1fb5..f65ec72 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -369,6 +369,10 @@ pub enum Error {
     /// Error while converting float to json value
     #[error("failed to convert avro float to json: {0}")]
     ConvertF64ToJson(f64),
+
+    /// Error while resolving Schema::Ref
+    #[error("Unresolved schema reference: {0}")]
+    SchemaResolutionError(String),
 }
 
 impl serde::ser::Error for Error {
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 3617bb1..2697328 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -21,6 +21,7 @@ use digest::Digest;
 use lazy_static::lazy_static;
 use regex::Regex;
 use serde::{
+    ser,
     ser::{SerializeMap, SerializeSeq},
     Deserialize, Serialize, Serializer,
 };
@@ -31,6 +32,7 @@ use std::{
     convert::TryInto,
     fmt,
     str::FromStr,
+    sync::{Arc, Mutex},
 };
 use strum_macros::{EnumDiscriminants, EnumString};
 
@@ -141,6 +143,10 @@ pub enum Schema {
     TimestampMicros,
     /// An amount of time defined by a number of months, days and milliseconds.
     Duration,
+    // A reference to another schema.
+    Ref {
+        name: Name,
+    },
 }
 
 impl PartialEq for Schema {
@@ -234,6 +240,11 @@ impl Name {
     fn parse(complex: &Map<String, Value>) -> AvroResult<Self> {
         let name = complex.name().ok_or(Error::GetNameField)?;
 
+        let type_name = match complex.get("type") {
+            Some(Value::Object(complex_type)) => complex_type.name().or(None),
+            _ => None,
+        };
+
         let namespace = complex.string("namespace");
 
         let aliases: Option<Vec<String>> = complex
@@ -248,7 +259,7 @@ impl Name {
             });
 
         Ok(Name {
-            name,
+            name: type_name.unwrap_or(name),
             namespace,
             aliases,
         })
@@ -420,11 +431,24 @@ fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalM
 #[derive(Default)]
 struct Parser {
     input_schemas: HashMap<String, Value>,
+    // A map of name -> Schema::Ref
+    // Used to resolve cyclic references, i.e. when a
+    // field's type is a reference to its record's type
+    resolving_schemas: HashMap<String, Schema>,
     input_order: Vec<String>,
+    // A map of name -> fully parsed Schema
+    // Used to avoid parsing the same schema twice
     parsed_schemas: HashMap<String, Schema>,
 }
 
 impl Schema {
+    // Used to help resolve cyclic references while serializing Schema to JSON.
+    // Needed because serde[_json] does not support using contexts.
+    // TODO: See whether alternatives like
+    // https://users.rust-lang.org/t/serde-question-access-to-a-shared-context-data-within-serialize-and-deserialize/39546
+    // can be used
+    thread_local!(static SCHEMAS_BY_NAME: Arc<Mutex<HashMap<String, Schema>>> = Arc::new(Mutex::new(HashMap::new())));
+
     /// Converts `self` into its [Parsing Canonical Form].
     ///
     /// [Parsing Canonical Form]:
@@ -480,6 +504,7 @@ impl Schema {
         }
         let mut parser = Parser {
             input_schemas,
+            resolving_schemas: HashMap::default(),
             input_order,
             parsed_schemas: HashMap::with_capacity(input.len()),
         };
@@ -515,7 +540,8 @@ impl Parser {
                 .remove_entry(&next_name)
                 .expect("Key unexpectedly missing");
             let parsed = self.parse(&value)?;
-            self.parsed_schemas.insert(name, parsed);
+            self.parsed_schemas
+                .insert(get_schema_type_name(name, value), parsed);
         }
 
         let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
@@ -558,9 +584,11 @@ impl Parser {
     }
 
     /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`.
-    /// If a parsed schema is not found, it checks if a json  with that name exists
-    /// in `input_schemas` and then parses it  (removing it from `input_schemas`)
-    /// and adds the parsed schema to `parsed_schemas`
+    /// If a parsed schema is not found, it checks if a currently resolving
+    /// schema with that name exists.
+    /// If a resolving schema is not found, it checks if a json with that name exists
+    /// in `input_schemas` and then parses it (removing it from `input_schemas`)
+    /// and adds the parsed schema to `parsed_schemas`.
     ///
     /// This method allows schemas definitions that depend on other types to
     /// parse their dependencies (or look them up if already parsed).
@@ -568,12 +596,20 @@ impl Parser {
         if let Some(parsed) = self.parsed_schemas.get(name) {
             return Ok(parsed.clone());
         }
+        if let Some(resolving_schema) = self.resolving_schemas.get(name) {
+            return Ok(resolving_schema.clone());
+        }
+
         let value = self
             .input_schemas
             .remove(name)
             .ok_or_else(|| Error::ParsePrimitive(name.into()))?;
+
         let parsed = self.parse(&value)?;
-        self.parsed_schemas.insert(name.to_string(), parsed.clone());
+        self.parsed_schemas.insert(
+            get_schema_type_name(name.to_string(), value),
+            parsed.clone(),
+        );
         Ok(parsed)
     }
 
@@ -772,6 +808,10 @@ impl Parser {
 
         let mut lookup = HashMap::new();
 
+        let resolving_schema = Schema::Ref { name: name.clone() };
+        self.resolving_schemas
+            .insert(name.name.clone(), resolving_schema);
+
         let fields: Vec<RecordField> = complex
             .get("fields")
             .and_then(|fields| fields.as_array())
@@ -798,6 +838,7 @@ impl Parser {
 
         self.parsed_schemas
             .insert(name.fullname(None), schema.clone());
+        self.resolving_schemas.remove(name.name.as_str());
         Ok(schema)
     }
 
@@ -893,12 +934,45 @@ impl Parser {
     }
 }
 
+fn get_schema_type_name(name: String, value: Value) -> String {
+    match value.get("type") {
+        Some(Value::Object(complex_type)) => match complex_type.name() {
+            Some(name) => name,
+            _ => name,
+        },
+        _ => name,
+    }
+}
+
 impl Serialize for Schema {
     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     where
         S: Serializer,
     {
+        fn remember_schema(name: &Name, schema: &Schema) {
+            Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| match schemas_by_name.try_lock() {
+                Ok(mut schemas) => {
+                    schemas.insert((&name.name).clone(), schema.clone());
+                }
+                Err(poisoned) => {
+                    error!("Wasn't able to lock schemas_by_name {:?}", poisoned);
+                }
+            });
+        }
+
         match *self {
+            Schema::Ref { ref name } => {
+                let name = &name.name;
+                Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| {
+                    let schemas = schemas_by_name.lock().unwrap();
+                    if schemas.contains_key(name.as_str()) {
+                        serializer.serialize_str(name)
+                    } else {
+                        Err(ser::Error::custom(format!("Could not serialize Schema::Ref('{}') because it cannot be found in ({})",
+                                                       name, schemas.keys().cloned().collect::<Vec<_>>().join(", "))))
+                    }
+                })
+            }
             Schema::Null => serializer.serialize_str("null"),
             Schema::Boolean => serializer.serialize_str("boolean"),
             Schema::Int => serializer.serialize_str("int"),
@@ -933,6 +1007,7 @@ impl Serialize for Schema {
                 ref fields,
                 ..
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "record")?;
                 if let Some(ref n) = name.namespace {
@@ -953,6 +1028,7 @@ impl Serialize for Schema {
                 ref symbols,
                 ..
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "enum")?;
                 map.serialize_entry("name", &name.name)?;
@@ -964,6 +1040,7 @@ impl Serialize for Schema {
                 ref doc,
                 ref size,
             } => {
+                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "fixed")?;
                 map.serialize_entry("name", &name.name)?;
@@ -1240,7 +1317,7 @@ mod tests {
 
     #[test]
     fn test_record_schema() {
-        let schema = Schema::parse_str(
+        let parsed = Schema::parse_str(
             r#"
             {
                 "type": "record",
@@ -1282,7 +1359,78 @@ mod tests {
             lookup,
         };
 
-        assert_eq!(expected, schema);
+        assert_eq!(parsed, expected);
+    }
+
+    // AVRO-3302
+    #[test]
+    fn test_record_schema_with_currently_parsing_schema() {
+        let schema = Schema::parse_str(
+            r#"
+            {
+                "type": "record",
+                "name": "test",
+                "fields": [{
+                    "name": "recordField",
+                    "type": {
+                        "type": "record",
+                        "name": "Node",
+                        "fields": [
+                            {"name": "label", "type": "string"},
+                            {"name": "children", "type": {"type": "array", "items": "Node"}}
+                        ]
+                    }
+                }]
+            }
+        "#,
+        )
+        .unwrap();
+
+        let mut lookup = HashMap::new();
+        lookup.insert("recordField".to_owned(), 0);
+
+        let mut node_lookup = HashMap::new();
+        node_lookup.insert("children".to_owned(), 1);
+        node_lookup.insert("label".to_owned(), 0);
+
+        let expected = Schema::Record {
+            name: Name::new("test"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "recordField".to_string(),
+                doc: None,
+                default: None,
+                schema: Schema::Record {
+                    name: Name::new("Node"),
+                    doc: None,
+                    fields: vec![
+                        RecordField {
+                            name: "label".to_string(),
+                            doc: None,
+                            default: None,
+                            schema: Schema::String,
+                            order: RecordFieldOrder::Ascending,
+                            position: 0,
+                        },
+                        RecordField {
+                            name: "children".to_string(),
+                            doc: None,
+                            default: None,
+                            schema: Schema::Array(Box::new(Schema::Ref {
+                                name: Name::new("Node"),
+                            })),
+                            order: RecordFieldOrder::Ascending,
+                            position: 1,
+                        },
+                    ],
+                    lookup: node_lookup,
+                },
+                order: RecordFieldOrder::Ascending,
+                position: 0,
+            }],
+            lookup,
+        };
+        assert_eq!(schema, expected);
     }
 
     #[test]
diff --git a/lang/rust/src/schema_compatibility.rs b/lang/rust/src/schema_compatibility.rs
index 5ba46e1..7c815e0 100644
--- a/lang/rust/src/schema_compatibility.rs
+++ b/lang/rust/src/schema_compatibility.rs
@@ -433,7 +433,6 @@ mod tests {
             .map(|s| s.canonical_form())
             .collect::<Vec<String>>()
             .join(",");
-        dbg!(&schema_string);
         Schema::parse_str(&format!("[{}]", schema_string)).unwrap()
     }
 
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 75f0509..48a1813 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -323,6 +323,7 @@ impl Value {
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
         match (self, schema) {
+            (_, &Schema::Ref { name: _ }) => true,
             (&Value::Null, &Schema::Null) => true,
             (&Value::Boolean(_), &Schema::Boolean) => true,
             (&Value::Int(_), &Schema::Int) => true,
@@ -383,7 +384,10 @@ impl Value {
                     }
                 })
             }
-            _ => false,
+            (v, s) => {
+                error!("Unsupported value-schema combination:\n{:?}\n{:?}", v, s);
+                false
+            }
         }
     }
 
@@ -394,45 +398,79 @@ impl Value {
     /// in the Avro specification for the full set of rules of schema
     /// resolution.
     pub fn resolve(mut self, schema: &Schema) -> AvroResult<Self> {
-        // Check if this schema is a union, and if the reader schema is not.
-        if SchemaKind::from(&self) == SchemaKind::Union
-            && SchemaKind::from(schema) != SchemaKind::Union
-        {
-            // Pull out the Union, and attempt to resolve against it.
-            let v = match self {
-                Value::Union(b) => *b,
-                _ => unreachable!(),
-            };
-            self = v;
-        }
-        match *schema {
-            Schema::Null => self.resolve_null(),
-            Schema::Boolean => self.resolve_boolean(),
-            Schema::Int => self.resolve_int(),
-            Schema::Long => self.resolve_long(),
-            Schema::Float => self.resolve_float(),
-            Schema::Double => self.resolve_double(),
-            Schema::Bytes => self.resolve_bytes(),
-            Schema::String => self.resolve_string(),
-            Schema::Fixed { size, .. } => self.resolve_fixed(size),
-            Schema::Union(ref inner) => self.resolve_union(inner),
-            Schema::Enum { ref symbols, .. } => self.resolve_enum(symbols),
-            Schema::Array(ref inner) => self.resolve_array(inner),
-            Schema::Map(ref inner) => self.resolve_map(inner),
-            Schema::Record { ref fields, .. } => self.resolve_record(fields),
-            Schema::Decimal {
-                scale,
-                precision,
-                ref inner,
-            } => self.resolve_decimal(precision, scale, inner),
-            Schema::Date => self.resolve_date(),
-            Schema::TimeMillis => self.resolve_time_millis(),
-            Schema::TimeMicros => self.resolve_time_micros(),
-            Schema::TimestampMillis => self.resolve_timestamp_millis(),
-            Schema::TimestampMicros => self.resolve_timestamp_micros(),
-            Schema::Duration => self.resolve_duration(),
-            Schema::Uuid => self.resolve_uuid(),
+        pub fn resolve0(
+            value: &mut Value,
+            schema: &Schema,
+            schemas_by_name: &mut HashMap<String, Schema>,
+        ) -> AvroResult<Value> {
+            // Check if this schema is a union, and if the reader schema is not.
+            if SchemaKind::from(&value.clone()) == SchemaKind::Union
+                && SchemaKind::from(schema) != SchemaKind::Union
+            {
+                // Pull out the Union, and attempt to resolve against it.
+                let v = match value {
+                    Value::Union(b) => &**b,
+                    _ => unreachable!(),
+                };
+                *value = v.clone();
+            }
+            let val: Value = value.clone();
+            match *schema {
+                Schema::Ref { ref name } => {
+                    if let Some(resolved) = schemas_by_name.get(name.name.as_str()) {
+                        resolve0(value, resolved, &mut schemas_by_name.clone())
+                    } else {
+                        Err(Error::SchemaResolutionError(name.name.clone()))
+                    }
+                }
+                Schema::Null => val.resolve_null(),
+                Schema::Boolean => val.resolve_boolean(),
+                Schema::Int => val.resolve_int(),
+                Schema::Long => val.resolve_long(),
+                Schema::Float => val.resolve_float(),
+                Schema::Double => val.resolve_double(),
+                Schema::Bytes => val.resolve_bytes(),
+                Schema::String => val.resolve_string(),
+                Schema::Fixed { ref name, size, .. } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_fixed(size)
+                }
+                Schema::Union(ref inner) => val.resolve_union(inner),
+                Schema::Enum {
+                    ref name,
+                    ref symbols,
+                    ..
+                } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_enum(symbols)
+                }
+                Schema::Array(ref inner) => val.resolve_array(inner),
+                Schema::Map(ref inner) => val.resolve_map(inner),
+                Schema::Record {
+                    ref name,
+                    ref fields,
+                    ..
+                } => {
+                    schemas_by_name.insert(name.name.clone(), schema.clone());
+                    val.resolve_record(fields)
+                }
+                Schema::Decimal {
+                    scale,
+                    precision,
+                    ref inner,
+                } => val.resolve_decimal(precision, scale, inner),
+                Schema::Date => val.resolve_date(),
+                Schema::TimeMillis => val.resolve_time_millis(),
+                Schema::TimeMicros => val.resolve_time_micros(),
+                Schema::TimestampMillis => val.resolve_timestamp_millis(),
+                Schema::TimestampMicros => val.resolve_timestamp_micros(),
+                Schema::Duration => val.resolve_duration(),
+                Schema::Uuid => val.resolve_uuid(),
+            }
         }
+
+        let mut schemas_by_name: HashMap<String, Schema> = HashMap::new();
+        resolve0(&mut self, schema, &mut schemas_by_name)
     }
 
     fn resolve_uuid(self) -> Result<Self, Error> {
diff --git a/lang/rust/src/util.rs b/lang/rust/src/util.rs
index f9daf28..e2b353b 100644
--- a/lang/rust/src/util.rs
+++ b/lang/rust/src/util.rs
@@ -21,7 +21,7 @@ use std::{convert::TryFrom, i64, io::Read, sync::Once};
 
 /// Maximum number of bytes that can be allocated when decoding
 /// Avro-encoded values. This is a protection against ill-formed
-/// data, whose length field might be interpreted as enourmous.
+/// data, whose length field might be interpreted as enormous.
 /// See max_allocation_bytes to change this limit.
 pub static mut MAX_ALLOCATION_BYTES: usize = 512 * 1024 * 1024;
 static MAX_ALLOCATION_BYTES_ONCE: Once = Once::new();
@@ -153,19 +153,40 @@ mod tests {
     #[test]
     fn test_zig_i64() {
         let mut s = Vec::new();
-        zig_i64(std::i32::MAX as i64, &mut s);
+
+        zig_i64(0, &mut s);
+        assert_eq!(s, [0]);
+
+        s.clear();
+        zig_i64(-1, &mut s);
+        assert_eq!(s, [1]);
+
+        s.clear();
+        zig_i64(1, &mut s);
+        assert_eq!(s, [2]);
+
+        s.clear();
+        zig_i64(-64, &mut s);
+        assert_eq!(s, [127]);
+
+        s.clear();
+        zig_i64(64, &mut s);
+        assert_eq!(s, [128, 1]);
+
+        s.clear();
+        zig_i64(i32::MAX as i64, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i64(std::i32::MAX as i64 + 1, &mut s);
+        zig_i64(i32::MAX as i64 + 1, &mut s);
         assert_eq!(s, [128, 128, 128, 128, 16]);
 
         s.clear();
-        zig_i64(std::i32::MIN as i64, &mut s);
+        zig_i64(i32::MIN as i64, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i64(std::i32::MIN as i64 - 1, &mut s);
+        zig_i64(i32::MIN as i64 - 1, &mut s);
         assert_eq!(s, [129, 128, 128, 128, 16]);
 
         s.clear();
@@ -180,27 +201,27 @@ mod tests {
     #[test]
     fn test_zig_i32() {
         let mut s = Vec::new();
-        zig_i32(std::i32::MAX / 2, &mut s);
+        zig_i32(i32::MAX / 2, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 7]);
 
         s.clear();
-        zig_i32(std::i32::MIN / 2, &mut s);
+        zig_i32(i32::MIN / 2, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 7]);
 
         s.clear();
-        zig_i32(-(std::i32::MIN / 2), &mut s);
+        zig_i32(-(i32::MIN / 2), &mut s);
         assert_eq!(s, [128, 128, 128, 128, 8]);
 
         s.clear();
-        zig_i32(std::i32::MIN / 2 - 1, &mut s);
+        zig_i32(i32::MIN / 2 - 1, &mut s);
         assert_eq!(s, [129, 128, 128, 128, 8]);
 
         s.clear();
-        zig_i32(std::i32::MAX, &mut s);
+        zig_i32(i32::MAX, &mut s);
         assert_eq!(s, [254, 255, 255, 255, 15]);
 
         s.clear();
-        zig_i32(std::i32::MIN, &mut s);
+        zig_i32(i32::MIN, &mut s);
         assert_eq!(s, [255, 255, 255, 255, 15]);
     }
 
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index cc96429..77b6569 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -17,9 +17,11 @@
 
 use avro_rs::{
     schema::{Name, RecordField},
-    Error, Schema,
+    types::{Record, Value},
+    Codec, Error, Reader, Schema, Writer,
 };
 use lazy_static::lazy_static;
+use log::debug;
 
 fn init() {
     let _ = env_logger::builder()
@@ -739,29 +741,28 @@ fn test_parse_list_with_cross_deps_basic() {
 }
 
 #[test]
-/// Test that if a cycle of dependencies occurs in the input schema jsons, the algorithm terminates
-/// and returns an error. N.B. In the future, when recursive types are supported, this should be
-/// revisited.
-fn test_parse_list_recursive_type_error() {
+fn test_parse_list_recursive_type() {
     init();
     let schema_str_1 = r#"{
         "name": "A",
+        "doc": "A's schema",
         "type": "record",
         "fields": [
-            {"name": "field_one", "type": "B"}
+            {"name": "a_field_one", "type": "B"}
         ]
     }"#;
     let schema_str_2 = r#"{
         "name": "B",
+        "doc": "B's schema",
         "type": "record",
         "fields": [
-            {"name": "field_one", "type": "A"}
+            {"name": "b_field_one", "type": "A"}
         ]
     }"#;
     let schema_strs_first = [schema_str_1, schema_str_2];
     let schema_strs_second = [schema_str_2, schema_str_1];
-    let _ = Schema::parse_list(&schema_strs_first).expect_err("Test failed");
-    let _ = Schema::parse_list(&schema_strs_second).expect_err("Test failed");
+    let _ = Schema::parse_list(&schema_strs_first).expect("Test failed");
+    let _ = Schema::parse_list(&schema_strs_second).expect("Test failed");
 }
 
 #[test]
@@ -1306,6 +1307,73 @@ fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> {
     }
 }
 
+// AVRO-3302
+#[test]
+fn test_record_schema_with_cyclic_references() {
+    let schema = Schema::parse_str(
+        r#"
+            {
+                "type": "record",
+                "name": "test",
+                "fields": [{
+                    "name": "recordField",
+                    "type": {
+                        "type": "record",
+                        "name": "Node",
+                        "fields": [
+                            {"name": "label", "type": "string"},
+                            {"name": "children", "type": {"type": "array", "items": "Node"}}
+                        ]
+                    }
+                }]
+            }
+        "#,
+    )
+    .unwrap();
+
+    let mut datum = Record::new(&schema).unwrap();
+    datum.put(
+        "recordField",
+        Value::Record(vec![
+            ("label".into(), Value::String("level_1".into())),
+            (
+                "children".into(),
+                Value::Array(vec![Value::Record(vec![
+                    ("label".into(), Value::String("level_2".into())),
+                    (
+                        "children".into(),
+                        Value::Array(vec![Value::Record(vec![
+                            ("label".into(), Value::String("level_3".into())),
+                            (
+                                "children".into(),
+                                Value::Array(vec![Value::Record(vec![
+                                    ("label".into(), Value::String("level_4".into())),
+                                    ("children".into(), Value::Array(vec![])),
+                                ])]),
+                            ),
+                        ])]),
+                    ),
+                ])]),
+            ),
+        ]),
+    );
+
+    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+    if let Err(err) = writer.append(datum) {
+        panic!("An error occurred while writing datum: {:?}", err)
+    }
+    let bytes = writer.into_inner().unwrap();
+    assert_eq!(316, bytes.len());
+
+    match Reader::new(&mut bytes.as_slice()) {
+        Ok(mut reader) => match reader.next() {
+            Some(value) => debug!("{:?}", value.unwrap()),
+            None => panic!("No value was read!"),
+        },
+        Err(err) => panic!("An error occurred while reading datum: {:?}", err),
+    }
+}
+
 /*
 // TODO: (#93) add support for logical type and attributes and uncomment (may need some tweaks to compile)
 #[test]

[avro] 25/30: AVRO-3302: Add support for parsing recursive schemas for Enum

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit f666c568ccd11493292acdafcc2ebf2a5818eed0
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Jan 20 09:22:57 2022 +0200

    AVRO-3302: Add support for parsing recursive schemas for Enum
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 064cc6b4bd1c6dcf59da989e712a27f0a955f70d)
---
 lang/rust/src/schema.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 60 insertions(+), 5 deletions(-)

diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 73b64c7..5415d35 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -786,7 +786,7 @@ impl Parser {
         match complex.get("type") {
             Some(&Value::String(ref t)) => match t.as_str() {
                 "record" => self.parse_record(complex),
-                "enum" => Self::parse_enum(complex),
+                "enum" => self.parse_enum(complex),
                 "array" => self.parse_array(complex),
                 "map" => self.parse_map(complex),
                 "fixed" => Self::parse_fixed(complex),
@@ -842,7 +842,7 @@ impl Parser {
 
     /// Parse a `serde_json::Value` representing a Avro enum type into a
     /// `Schema`.
-    fn parse_enum(complex: &Map<String, Value>) -> AvroResult<Schema> {
+    fn parse_enum(&mut self, complex: &Map<String, Value>) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
 
         let symbols: Vec<String> = complex
@@ -872,11 +872,14 @@ impl Parser {
             existing_symbols.insert(symbol);
         }
 
-        Ok(Schema::Enum {
-            name,
+        let schema = Schema::Enum {
+            name: name.clone(),
             doc: complex.doc(),
             symbols,
-        })
+        };
+        self.parsed_schemas
+            .insert(name.fullname(None), schema.clone());
+        Ok(schema)
     }
 
     /// Parse a `serde_json::Value` representing a Avro array type into a
@@ -1514,6 +1517,58 @@ mod tests {
         assert_eq!(canonical_form, &expected);
     }
 
+    // https://github.com/flavray/avro-rs/pull/99#issuecomment-1016948451
+    #[test]
+    fn test_parsing_of_recursive_type_enum() {
+        let schema = r#"
+    {
+        "type": "record",
+        "name": "User",
+        "namespace": "office",
+        "fields": [
+            {
+              "name": "details",
+              "type": [
+                {
+                  "type": "record",
+                  "name": "Employee",
+                  "fields": [
+                    {
+                      "name": "gender",
+                      "type": {
+                        "type": "enum",
+                        "name": "Gender",
+                        "symbols": [
+                          "male",
+                          "female"
+                        ]
+                      },
+                      "default": "female"
+                    }
+                  ]
+                },
+                {
+                  "type": "record",
+                  "name": "Manager",
+                  "fields": [
+                    {
+                      "name": "gender",
+                      "type": "Gender"
+                    }
+                  ]
+                }
+              ]
+            }
+          ]
+}
+"#;
+
+        let schema = Schema::parse_str(schema).unwrap();
+        let schema_str = schema.canonical_form();
+        let expected = r#"{"name":"office.User","type":"record","fields":[{"name":"details","type":[{"name":"Employee","type":"record","fields":[{"name":"gender","type":{"name":"Gender","type":"enum","symbols":["male","female"]}}]},{"name":"Manager","type":"record","fields":[{"name":"gender","type":{"name":"Gender","type":"enum","symbols":["male","female"]}}]}]}]}"#;
+        assert_eq!(schema_str, expected);
+    }
+
     #[test]
     fn test_enum_schema() {
         let schema = Schema::parse_str(

[avro] 12/30: AVRO-3240: fix deserializer schema backward compatibility (#1379)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 578c81bc0a2ae3ffee1b4b58d6947a81c23054e2
Author: Ultrabug <ul...@users.noreply.github.com>
AuthorDate: Fri Jan 7 14:30:56 2022 +0100

    AVRO-3240: fix deserializer schema backward compatibility (#1379)
    
    * AVRO-3240: fix decoding schema when reaching the end of the reader
    
    * AVRO-3240: add from_avro_datum test for reader EOF
    
    * AVRO-3240: add reader eof handling to Schema::String
    
    (cherry picked from commit 7584320142768612bc7a58242231a476404bbceb)
---
 lang/rust/src/decode.rs | 78 +++++++++++++++++++++++++++++++++----------------
 lang/rust/src/reader.rs | 64 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 117 insertions(+), 25 deletions(-)

diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 9108d37..71ee4c1 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -23,7 +23,7 @@ use crate::{
     util::{safe_len, zag_i32, zag_i64},
     AvroResult, Error,
 };
-use std::{collections::HashMap, convert::TryFrom, io::Read, str::FromStr};
+use std::{collections::HashMap, convert::TryFrom, io::{ErrorKind, Read}, str::FromStr};
 use uuid::Uuid;
 
 #[inline]
@@ -67,15 +67,23 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
         Schema::Null => Ok(Value::Null),
         Schema::Boolean => {
             let mut buf = [0u8; 1];
-            reader
-                .read_exact(&mut buf[..])
-                .map_err(Error::ReadBoolean)?;
-
-            match buf[0] {
-                0u8 => Ok(Value::Boolean(false)),
-                1u8 => Ok(Value::Boolean(true)),
-                _ => Err(Error::BoolValue(buf[0])),
-            }
+            match reader
+                .read_exact(&mut buf[..]) {
+                    Ok(_) => {
+                        match buf[0] {
+                            0u8 => Ok(Value::Boolean(false)),
+                            1u8 => Ok(Value::Boolean(true)),
+                            _ => Err(Error::BoolValue(buf[0])),
+                        }
+                    },
+                    Err(io_err) => {
+                        if let ErrorKind::UnexpectedEof = io_err.kind() {
+                            Ok(Value::Null)
+                        } else {
+                            Err(Error::ReadBoolean(io_err))
+                        }
+                    },
+                }
         }
         Schema::Decimal { ref inner, .. } => match &**inner {
             Schema::Fixed { .. } => match decode(inner, reader)? {
@@ -126,11 +134,20 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
         Schema::String => {
             let len = decode_len(reader)?;
             let mut buf = vec![0u8; len];
-            reader.read_exact(&mut buf).map_err(Error::ReadString)?;
-
-            Ok(Value::String(
-                String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-            ))
+            match reader.read_exact(&mut buf) {
+                Ok(_) => {
+                    Ok(Value::String(
+                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                    ))
+                },
+                Err(io_err) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Null)
+                    } else {
+                        Err(Error::ReadString(io_err))
+                    }
+                },
+            }
         }
         Schema::Fixed { size, .. } => {
             let mut buf = vec![0u8; size];
@@ -180,16 +197,27 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
             Ok(Value::Map(items))
         }
         Schema::Union(ref inner) => {
-            let index = zag_i64(reader)?;
-            let variants = inner.variants();
-            let variant = variants
-                .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
-                .ok_or_else(|| Error::GetUnionVariant {
-                    index,
-                    num_variants: variants.len(),
-                })?;
-            let value = decode(variant, reader)?;
-            Ok(Value::Union(Box::new(value)))
+            match zag_i64(reader) {
+                Ok(index) => {
+                    let variants = inner.variants();
+                    let variant = variants
+                        .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
+                        .ok_or_else(|| Error::GetUnionVariant {
+                            index,
+                            num_variants: variants.len(),
+                        })?;
+                    let value = decode(variant, reader)?;
+                    Ok(Value::Union(Box::new(value)))
+                },
+                Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Union(Box::new(Value::Null)))
+                    } else {
+                        Err(Error::ReadVariableIntegerBytes(io_err))
+                    }
+                },
+                Err(io_err) => Err(io_err),
+            }
         }
         Schema::Record { ref fields, .. } => {
             // Benchmarks indicate ~10% improvement using this method.
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 467c658..e036991 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -304,8 +304,10 @@ pub fn from_avro_datum<R: Read>(
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::from_value;
     use crate::{types::Record, Reader};
     use std::io::Cursor;
+    use serde::Deserialize;
 
     const SCHEMA: &str = r#"
     {
@@ -341,6 +343,47 @@ mod tests {
         6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
         207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
     ];
+    const TEST_RECORD_SCHEMA_3240: &str = r#"
+    {
+      "type": "record",
+      "name": "test",
+      "fields": [
+        {
+          "name": "a",
+          "type": "long",
+          "default": 42
+        },
+        {
+          "name": "b",
+          "type": "string"
+        },
+        {
+            "name": "a_nullable_array",
+            "type": ["null", {"type": "array", "items": {"type": "string"}}],
+            "default": null
+        },
+        {
+            "name": "a_nullable_boolean",
+            "type": ["null", {"type": "boolean"}],
+            "default": null
+        },
+        {
+            "name": "a_nullable_string",
+            "type": ["null", {"type": "string"}],
+            "default": null
+        }
+      ]
+    }
+    "#;
+    #[derive(Default, Debug, Deserialize, PartialEq)]
+    struct TestRecord {
+        a: i64,
+        b: String,
+        a_nullable_array: Option<Vec<String>>,
+        // we are missing the 'a_nullable_boolean' field to simulate missing keys
+        // a_nullable_boolean: Option<bool>,
+        a_nullable_string: Option<String>,
+    }
 
     #[test]
     fn test_from_avro_datum() {
@@ -359,6 +402,27 @@ mod tests {
     }
 
     #[test]
+    fn test_from_avro_datum_with_union_to_struct() {
+        let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
+        let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
+
+        let expected_record: TestRecord = TestRecord {
+            a: 27i64,
+            b: String::from("foo"),
+            a_nullable_array: None,
+            a_nullable_string: None,
+        };
+
+        let avro_datum = from_avro_datum(&schema, &mut encoded, None).unwrap();
+        let parsed_record: TestRecord = match &avro_datum {
+            Value::Record(_) => from_value::<TestRecord>(&avro_datum).unwrap(),
+            unexpected => panic!("could not map avro data to struct, found unexpected: {:?}", unexpected),
+        };
+
+        assert_eq!(parsed_record, expected_record);
+    }
+
+    #[test]
     fn test_null_union() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
         let mut encoded: &'static [u8] = &[2, 0];

[avro] 03/30: AVRO-3234: add new codec to lang/rust: zstandard (#1370)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 8aa071cd4135525e3ca550f0fc16623a1cf2750a
Author: sycured <60...@users.noreply.github.com>
AuthorDate: Wed Jan 5 02:01:28 2022 -0500

    AVRO-3234: add new codec to lang/rust: zstandard (#1370)
    
    * AVRO-3234: add new codec zstandard
    
    * update MSRV to 1.51.0
    
    (cherry picked from commit 04e41fc2c8aa21e829f690e791e8aa2b0a049700)
---
 .github/workflows/test-lang-rust-ci.yml |  2 +-
 lang/rust/Cargo.toml                    |  2 ++
 lang/rust/README.md                     | 10 ++++++++++
 lang/rust/src/codec.rs                  | 33 +++++++++++++++++++++++++++++++++
 lang/rust/src/error.rs                  |  6 ++++++
 5 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/test-lang-rust-ci.yml b/.github/workflows/test-lang-rust-ci.yml
index 977ea11..dacf461 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -39,7 +39,7 @@ jobs:
           - stable
           - beta
           - nightly
-          - 1.48.0  # MSRV
+          - 1.51.0  # MSRV
 
     steps:
       - name: Checkout
diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index d82e5ff..23ddc2a 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -30,6 +30,7 @@ documentation = "https://docs.rs/avro-rs"
 
 [features]
 snappy = ["crc", "snap"]
+zstandard = ["zstd"]
 
 [lib]
 path = "src/lib.rs"
@@ -66,6 +67,7 @@ typed-builder = "0.9.1"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }
 zerocopy = "0.3.0"
 lazy_static = "1.1.1"
+zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }
 
 [dev-dependencies]
 md-5 = "0.9.1"
diff --git a/lang/rust/README.md b/lang/rust/README.md
index e934cc3..9d0ef24 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -71,6 +71,14 @@ version = "x.y"
 features = ["snappy"]
 ```
 
+Or in case you want to leverage the **Zstandard** codec:
+
+```toml
+[dependencies.avro-rs]
+version = "x.y"
+features = ["zstandard"]
+```
+
 ## Upgrading to a newer minor version
 
 The library is still in beta, so there might be backward-incompatible changes between minor
@@ -244,6 +252,8 @@ RFC 1950) does not have a checksum.
 * **Snappy**: uses Google's [Snappy](http://google.github.io/snappy/) compression library. Each
 compressed block is followed by the 4-byte, big-endianCRC32 checksum of the uncompressed data in
 the block. You must enable the `snappy` feature to use this codec.
+* **Zstandard**: uses Facebook's [Zstandard](https://facebook.github.io/zstd/) compression library.
+You must enable the `zstandard` feature to use this codec.
 
 To specify a codec to use to compress data, just specify it while creating a `Writer`:
 ```rust
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 25d395a..a27f058 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -36,6 +36,8 @@ pub enum Codec {
     /// compression library. Each compressed block is followed by the 4-byte, big-endian
     /// CRC32 checksum of the uncompressed data in the block.
     Snappy,
+    #[cfg(feature = "zstandard")]
+    Zstd,
 }
 
 impl From<Codec> for Value {
@@ -73,6 +75,12 @@ impl Codec {
 
                 *stream = encoded;
             }
+            #[cfg(feature = "zstandard")]
+            Codec::Zstd => {
+                let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
+                encoder.write_all(stream).map_err(Error::ZstdCompress)?;
+                *stream = encoder.finish().unwrap();
+            }
         };
 
         Ok(())
@@ -109,6 +117,13 @@ impl Codec {
                 }
                 decoded
             }
+            #[cfg(feature = "zstandard")]
+            Codec::Zstd => {
+                let mut decoded = Vec::new();
+                let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
+                std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
+                decoded
+            }
         };
         Ok(())
     }
@@ -153,6 +168,18 @@ mod tests {
         assert_eq!(INPUT, stream.as_slice());
     }
 
+    #[cfg(feature = "zstandard")]
+    #[test]
+    fn zstd_compress_and_decompress() {
+        let codec = Codec::Zstd;
+        let mut stream = INPUT.to_vec();
+        codec.compress(&mut stream).unwrap();
+        assert_ne!(INPUT, stream.as_slice());
+        assert!(INPUT.len() > stream.len());
+        codec.decompress(&mut stream).unwrap();
+        assert_eq!(INPUT, stream.as_slice());
+    }
+
     #[test]
     fn codec_to_str() {
         assert_eq!(<&str>::from(Codec::Null), "null");
@@ -160,6 +187,9 @@ mod tests {
 
         #[cfg(feature = "snappy")]
         assert_eq!(<&str>::from(Codec::Snappy), "snappy");
+
+        #[cfg(feature = "zstandard")]
+        assert_eq!(<&str>::from(Codec::Zstd), "zstd");
     }
 
     #[test]
@@ -172,6 +202,9 @@ mod tests {
         #[cfg(feature = "snappy")]
         assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
 
+        #[cfg(feature = "zstandard")]
+        assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
+
         assert!(Codec::from_str("not a codec").is_err());
     }
 }
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index 8f38f76..b690bfb 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -309,6 +309,12 @@ pub enum Error {
     #[error("Failed to decompress with snappy")]
     SnappyDecompress(#[source] snap::Error),
 
+    #[error("Failed to compress with zstd")]
+    ZstdCompress(#[source] std::io::Error),
+
+    #[error("Failed to decompress with zstd")]
+    ZstdDecompress(#[source] std::io::Error),
+
     #[error("Failed to read header")]
     ReadHeader(#[source] std::io::Error),
 

[avro] 06/30: Update strum requirement from 0.21.0 to 0.23.0 in /lang/rust (#1408)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 27d4ea6abeea335fa525768a31d814681d70c1a3
Author: dependabot[bot] <49...@users.noreply.github.com>
AuthorDate: Wed Jan 5 10:09:21 2022 +0200

    Update strum requirement from 0.21.0 to 0.23.0 in /lang/rust (#1408)
    
    Updates the requirements on [strum](https://github.com/Peternator7/strum) to permit the latest version.
    - [Release notes](https://github.com/Peternator7/strum/releases)
    - [Changelog](https://github.com/Peternator7/strum/blob/master/CHANGELOG.md)
    - [Commits](https://github.com/Peternator7/strum/commits)
    
    ---
    updated-dependencies:
    - dependency-name: strum
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <su...@github.com>
    
    Co-authored-by: dependabot[bot] <49...@users.noreply.github.com>
    (cherry picked from commit 6a67d7161cf7c0f2e229c780b90afea8f2818886)
---
 lang/rust/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 328e858..8faffb3 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -62,7 +62,7 @@ regex = "1.5.4"
 serde_json = "1.0.67"
 serde = { version = "1.0.130", features = ["derive"] }
 snap = { version = "1.0.5", optional = true }
-strum = "0.21.0"
+strum = "0.23.0"
 strum_macros = "0.21.1"
 thiserror = "1.0.29"
 typed-builder = "0.9.1"

[avro] 20/30: AVRO-3303: Rust: Add support for Xz codec (#1455)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit f66f7470a982f706e812dedbc66abba530ff82e5
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Thu Jan 13 10:22:15 2022 +0200

    AVRO-3303: Rust: Add support for Xz codec (#1455)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit ee47d4cd64eddcbced7b7c77102480e767b210a6)
---
 lang/rust/Cargo.toml   |  2 ++
 lang/rust/README.md    | 12 ++++++++++
 lang/rust/src/codec.rs | 62 ++++++++++++++++++++++++++++++++------------------
 3 files changed, 54 insertions(+), 22 deletions(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 5fb8593..0f1f4ed 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -32,6 +32,7 @@ documentation = "https://docs.rs/avro-rs"
 snappy = ["crc32fast", "snap"]
 zstandard = ["zstd"]
 bzip = ["bzip2"]
+xz = ["xz2"]
 
 [lib]
 path = "src/lib.rs"
@@ -56,6 +57,7 @@ bzip2 = { version = "0.4.3", optional = true }
 crc32fast = { version = "1.2.1", optional = true }
 digest = "0.10.1"
 libflate = "1.1.1"
+xz2 = { version = "0.1.6", optional = true }
 num-bigint = "0.4.2"
 rand = "0.8.4"
 regex = "1.5.4"
diff --git a/lang/rust/README.md b/lang/rust/README.md
index 93950ca..0282d51 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -87,6 +87,15 @@ version = "x.y"
 features = ["bzip"]
 ```
 
+Or in case you want to leverage the **Xz** codec:
+
+```toml
+[dependencies.avro-rs]
+version = "x.y"
+features = ["xz"]
+```
+
+
 ## Upgrading to a newer minor version
 
 The library is still in beta, so there might be backward-incompatible changes between minor
@@ -264,6 +273,9 @@ the block. You must enable the `snappy` feature to use this codec.
 You must enable the `zstandard` feature to use this codec.
 * **Bzip2**: uses [BZip2](https://sourceware.org/bzip2/) compression library.
 You must enable the `bzip` feature to use this codec.
+* **Xz**: uses [xz2](https://github.com/alexcrichton/xz2-rs) compression library.
+  You must enable the `xz` feature to use this codec.
+
 
 To specify a codec to use to compress data, just specify it while creating a `Writer`:
 ```rust
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 15992c1..0ba8abe 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -30,6 +30,8 @@ use bzip2::{
 extern crate crc32fast;
 #[cfg(feature = "snappy")]
 use crc32fast::Hasher;
+#[cfg(feature = "xz")]
+use xz2::read::{XzDecoder, XzEncoder};
 
 /// The compression codec used to compress blocks.
 #[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
@@ -52,6 +54,10 @@ pub enum Codec {
     /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
     /// compression library.
     Bzip2,
+    #[cfg(feature = "xz")]
+    /// The `Xz` codec uses [Xz utils](https://tukaani.org/xz/)
+    /// compression library.
+    Xz,
 }
 
 impl From<Codec> for Value {
@@ -104,6 +110,14 @@ impl Codec {
                 encoder.read_to_end(&mut buffer).unwrap();
                 *stream = buffer;
             }
+            #[cfg(feature = "xz")]
+            Codec::Xz => {
+                let compression_level = 9;
+                let mut encoder = XzEncoder::new(&stream[..], compression_level);
+                let mut buffer = Vec::new();
+                encoder.read_to_end(&mut buffer).unwrap();
+                *stream = buffer;
+            }
         };
 
         Ok(())
@@ -156,6 +170,13 @@ impl Codec {
                 decoder.read_to_end(&mut decoded).unwrap();
                 decoded
             }
+            #[cfg(feature = "xz")]
+            Codec::Xz => {
+                let mut decoder = XzDecoder::new(&stream[..]);
+                let mut decoded: Vec<u8> = Vec::new();
+                decoder.read_to_end(&mut decoded).unwrap();
+                decoded
+            }
         };
         Ok(())
     }
@@ -179,43 +200,34 @@ mod tests {
 
     #[test]
     fn deflate_compress_and_decompress() {
-        let codec = Codec::Deflate;
-        let mut stream = INPUT.to_vec();
-        codec.compress(&mut stream).unwrap();
-        assert_ne!(INPUT, stream.as_slice());
-        assert!(INPUT.len() > stream.len());
-        codec.decompress(&mut stream).unwrap();
-        assert_eq!(INPUT, stream.as_slice());
+        compress_and_decompress(Codec::Deflate);
     }
 
     #[cfg(feature = "snappy")]
     #[test]
     fn snappy_compress_and_decompress() {
-        let codec = Codec::Snappy;
-        let mut stream = INPUT.to_vec();
-        codec.compress(&mut stream).unwrap();
-        assert_ne!(INPUT, stream.as_slice());
-        assert!(INPUT.len() > stream.len());
-        codec.decompress(&mut stream).unwrap();
-        assert_eq!(INPUT, stream.as_slice());
+        compress_and_decompress(Codec::Snappy);
     }
 
     #[cfg(feature = "zstandard")]
     #[test]
     fn zstd_compress_and_decompress() {
-        let codec = Codec::Zstd;
-        let mut stream = INPUT.to_vec();
-        codec.compress(&mut stream).unwrap();
-        assert_ne!(INPUT, stream.as_slice());
-        assert!(INPUT.len() > stream.len());
-        codec.decompress(&mut stream).unwrap();
-        assert_eq!(INPUT, stream.as_slice());
+        compress_and_decompress(Codec::Zstd);
     }
 
     #[cfg(feature = "bzip")]
     #[test]
     fn bzip_compress_and_decompress() {
-        let codec = Codec::Bzip2;
+        compress_and_decompress(Codec::Bzip2);
+    }
+
+    #[cfg(feature = "xz")]
+    #[test]
+    fn xz_compress_and_decompress() {
+        compress_and_decompress(Codec::Xz);
+    }
+
+    fn compress_and_decompress(codec: Codec) {
         let mut stream = INPUT.to_vec();
         codec.compress(&mut stream).unwrap();
         assert_ne!(INPUT, stream.as_slice());
@@ -237,6 +249,9 @@ mod tests {
 
         #[cfg(feature = "bzip")]
         assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
+
+        #[cfg(feature = "xz")]
+        assert_eq!(<&str>::from(Codec::Xz), "xz");
     }
 
     #[test]
@@ -255,6 +270,9 @@ mod tests {
         #[cfg(feature = "bzip")]
         assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
 
+        #[cfg(feature = "xz")]
+        assert_eq!(Codec::from_str("xz").unwrap(), Codec::Xz);
+
         assert!(Codec::from_str("not a codec").is_err());
     }
 }

[avro] 08/30: AVRO-3247 Rust: Run MIRI checks (#1390)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 355cf468ce37af8e1e97975d6a413633dc1c6837
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 10:43:55 2022 +0200

    AVRO-3247 Rust: Run MIRI checks (#1390)
    
    (cherry picked from commit 87d04ef9065e3ef21f1fb4135c56d3c6500d2530)
---
 .github/workflows/test-lang-rust-miri.yml | 53 +++++++++++++++++++++++++++++++
 lang/rust/src/schema.rs                   |  1 +
 2 files changed, 54 insertions(+)

diff --git a/.github/workflows/test-lang-rust-miri.yml b/.github/workflows/test-lang-rust-miri.yml
new file mode 100644
index 0000000..a29f192
--- /dev/null
+++ b/.github/workflows/test-lang-rust-miri.yml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Rust MIRI Check
+on:
+  workflow_dispatch:
+  push:
+    branches: [ master ]
+  pull_request:
+    branches: [ master ]
+    paths:
+      - .github/workflows/test-lang-rust-miri.yml
+      - lang/rust/**
+
+defaults:
+  run:
+    working-directory: lang/rust
+
+jobs:
+  miri_check:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v2
+      - name: Setup Rust toolchain
+        run: |
+          MIRI_NIGHTLY=nightly-$(curl -s https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/miri)
+          echo "Installing latest nightly with Miri: $MIRI_NIGHTLY"
+          rustup toolchain install $MIRI_NIGHTLY
+          rustup default $MIRI_NIGHTLY
+          rustup component add miri
+      - name: Run MIRI Checks
+        env:
+          RUST_BACKTRACE: full
+          RUST_LOG: trace
+          MIRIFLAGS: "-Zmiri-disable-isolation"
+        run: |
+          cargo miri setup
+          cargo clean
+          cargo miri test --lib
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index efafed5..8e15cd4 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -1291,6 +1291,7 @@ mod tests {
     }
 
     #[test]
+    #[cfg_attr(miri, ignore)] // Sha256 uses an inline assembly instructions which is not supported by miri
     fn test_schema_fingerprint() {
         use crate::rabin::Rabin;
         use md5::Md5;

[avro] 11/30: Update zerocopy requirement from 0.3.0 to 0.6.1 in /lang/rust (#1402)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e6849d13d4ead05333f491ba37756974dae1f5de
Author: dependabot[bot] <49...@users.noreply.github.com>
AuthorDate: Wed Jan 5 15:10:09 2022 +0200

    Update zerocopy requirement from 0.3.0 to 0.6.1 in /lang/rust (#1402)
    
    Updates the requirements on zerocopy to permit the latest version.
    
    ---
    updated-dependencies:
    - dependency-name: zerocopy
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <su...@github.com>
    
    Co-authored-by: dependabot[bot] <49...@users.noreply.github.com>
    (cherry picked from commit 7bda57acf5f19749bb3930f8816b06e4b3dee15a)
---
 lang/rust/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 3da92dd..7709156 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -67,7 +67,7 @@ strum_macros = "0.23.1"
 thiserror = "1.0.29"
 typed-builder = "0.9.1"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }
-zerocopy = "0.3.0"
+zerocopy = "0.6.1"
 lazy_static = "1.1.1"
 log = "0.4.14"
 zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }

[avro] 23/30: AVRO-3312: Use u32 instead of i32 for the Enum/Union's index field (#1465)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 5af11f3c70ed93a19ed77cf6a48351bacdae215d
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 19 21:20:52 2022 +0200

    AVRO-3312: Use u32 instead of i32 for the Enum/Union's index field (#1465)
    
    (cherry picked from commit e2eb392b5f080b7703d8bac35f6a478403d6f9af)
---
 lang/rust/src/decode.rs |  4 ++--
 lang/rust/src/encode.rs |  2 +-
 lang/rust/src/error.rs  |  3 +++
 lang/rust/src/ser.rs    | 21 +++++++++------------
 lang/rust/src/types.rs  | 12 ++++++------
 5 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index eb9c018..70c1ba5 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -215,7 +215,7 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
                             num_variants: variants.len(),
                         })?;
                     let value = decode0(variant, reader, schemas_by_name)?;
-                    Ok(Value::Union(index as i32, Box::new(value)))
+                    Ok(Value::Union(index as u32, Box::new(value)))
                 }
                 Err(Error::ReadVariableIntegerBytes(io_err)) => {
                     if let ErrorKind::UnexpectedEof = io_err.kind() {
@@ -254,7 +254,7 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
                         .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
                     if (0..=symbols.len()).contains(&index) {
                         let symbol = symbols[index].clone();
-                        Value::Enum(raw_index, symbol)
+                        Value::Enum(raw_index as u32, symbol)
                     } else {
                         return Err(Error::GetEnumValue {
                             index,
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 9a1fbef..6fc969a 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -123,7 +123,7 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
                 _ => error!("invalid schema type for String: {:?}", schema),
             },
             Value::Fixed(_, bytes) => buffer.extend(bytes),
-            Value::Enum(i, _) => encode_int(*i, buffer),
+            Value::Enum(i, _) => encode_int(*i as i32, buffer),
             Value::Union(idx, item) => {
                 if let Schema::Union(ref inner) = *schema {
                     let inner_schema = inner
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index f65ec72..d687eea 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -225,6 +225,9 @@ pub enum Error {
     #[error("Cannot convert u64 to usize: {1}")]
     ConvertU64ToUsize(#[source] std::num::TryFromIntError, u64),
 
+    #[error("Cannot convert u32 to usize: {1}")]
+    ConvertU32ToUsize(#[source] std::num::TryFromIntError, u32),
+
     #[error("Cannot convert i64 to usize: {1}")]
     ConvertI64ToUsize(#[source] std::num::TryFromIntError, i64),
 
diff --git a/lang/rust/src/ser.rs b/lang/rust/src/ser.rs
index 444ee20..5cff13e 100644
--- a/lang/rust/src/ser.rs
+++ b/lang/rust/src/ser.rs
@@ -142,7 +142,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
     }
 
     fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
-        if v <= i32::max_value() as u32 {
+        if v <= i32::MAX as u32 {
             self.serialize_i32(v as i32)
         } else {
             self.serialize_i64(i64::from(v))
@@ -150,7 +150,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
     }
 
     fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
-        if v <= i64::max_value() as u64 {
+        if v <= i64::MAX as u64 {
             self.serialize_i64(v as i64)
         } else {
             Err(ser::Error::custom("u64 is too large"))
@@ -203,7 +203,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
         index: u32,
         variant: &'static str,
     ) -> Result<Self::Ok, Self::Error> {
-        Ok(Value::Enum(index as i32, variant.to_string()))
+        Ok(Value::Enum(index, variant.to_string()))
     }
 
     fn serialize_newtype_struct<T: ?Sized>(
@@ -228,13 +228,10 @@ impl<'b> ser::Serializer for &'b mut Serializer {
         T: Serialize,
     {
         Ok(Value::Record(vec![
-            (
-                "type".to_owned(),
-                Value::Enum(index as i32, variant.to_owned()),
-            ),
+            ("type".to_owned(), Value::Enum(index, variant.to_owned())),
             (
                 "value".to_owned(),
-                Value::Union(index as i32, Box::new(value.serialize(self)?)),
+                Value::Union(index, Box::new(value.serialize(self)?)),
             ),
         ]))
     }
@@ -347,7 +344,7 @@ impl<'a> ser::SerializeSeq for SeqVariantSerializer<'a> {
         T: Serialize,
     {
         self.items.push(Value::Union(
-            self.index as i32,
+            self.index,
             Box::new(value.serialize(&mut Serializer::default())?),
         ));
         Ok(())
@@ -357,7 +354,7 @@ impl<'a> ser::SerializeSeq for SeqVariantSerializer<'a> {
         Ok(Value::Record(vec![
             (
                 "type".to_owned(),
-                Value::Enum(self.index as i32, self.variant.to_owned()),
+                Value::Enum(self.index, self.variant.to_owned()),
             ),
             ("value".to_owned(), Value::Array(self.items)),
         ]))
@@ -466,11 +463,11 @@ impl<'a> ser::SerializeStructVariant for StructVariantSerializer<'a> {
         Ok(Value::Record(vec![
             (
                 "type".to_owned(),
-                Value::Enum(self.index as i32, self.variant.to_owned()),
+                Value::Enum(self.index, self.variant.to_owned()),
             ),
             (
                 "value".to_owned(),
-                Value::Union(self.index as i32, Box::new(Value::Record(self.fields))),
+                Value::Union(self.index, Box::new(Value::Record(self.fields))),
             ),
         ]))
     }
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 77472b0..85e7341 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -64,14 +64,14 @@ pub enum Value {
     /// of its corresponding schema.
     /// This allows schema-less encoding, as well as schema resolution while
     /// reading values.
-    Enum(i32, String),
+    Enum(u32, String),
     /// An `union` Avro value.
     ///
     /// A Union is represented by the value it holds and its position in the type list
     /// of its corresponding schema
     /// This allows schema-less encoding, as well as schema resolution while
     /// reading values.
-    Union(i32, Box<Value>),
+    Union(u32, Box<Value>),
     /// An `array` Avro value.
     Array(Vec<Value>),
     /// A `map` Avro value.
@@ -175,7 +175,7 @@ where
     fn from(value: Option<T>) -> Self {
         // FIXME: this is incorrect in case first type in union is not "none"
         Self::Union(
-            value.is_some() as i32,
+            value.is_some() as u32,
             Box::new(value.map_or_else(|| Self::Null, Into::into)),
         )
     }
@@ -684,7 +684,7 @@ impl Value {
     fn resolve_enum(self, symbols: &[String]) -> Result<Self, Error> {
         let validate_symbol = |symbol: String, symbols: &[String]| {
             if let Some(index) = symbols.iter().position(|item| item == &symbol) {
-                Ok(Value::Enum(index as i32, symbol))
+                Ok(Value::Enum(index as u32, symbol))
             } else {
                 Err(Error::GetEnumDefault {
                     symbol,
@@ -696,7 +696,7 @@ impl Value {
         match self {
             Value::Enum(raw_index, s) => {
                 let index = usize::try_from(raw_index)
-                    .map_err(|e| Error::ConvertI32ToUsize(e, raw_index))?;
+                    .map_err(|e| Error::ConvertU32ToUsize(e, raw_index))?;
                 if (0..=symbols.len()).contains(&index) {
                     validate_symbol(s, symbols)
                 } else {
@@ -721,7 +721,7 @@ impl Value {
         // Find the first match in the reader schema.
         // FIXME: this might be wrong when the union consists of multiple same records that have different names
         let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
-        Ok(Value::Union(i as i32, Box::new(v.resolve(inner)?)))
+        Ok(Value::Union(i as u32, Box::new(v.resolve(inner)?)))
     }
 
     fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {

[avro] 16/30: AVRO-3232: Fix formatting

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 5d1e525ef00f84d28929633e4400b356a74f1000
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Fri Jan 7 16:11:09 2022 +0200

    AVRO-3232: Fix formatting
    
    Issue #1368
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit c0bfb88abebff7ec754c20b7797cbbe6a548495b)
---
 lang/rust/src/de.rs | 132 +++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 104 insertions(+), 28 deletions(-)

diff --git a/lang/rust/src/de.rs b/lang/rust/src/de.rs
index 4593a5d..6d89686 100644
--- a/lang/rust/src/de.rs
+++ b/lang/rust/src/de.rs
@@ -264,7 +264,10 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
                 Value::Array(ref fields) => visitor.visit_seq(SeqDeserializer::new(fields)),
                 Value::String(ref s) => visitor.visit_str(s),
                 Value::Map(ref items) => visitor.visit_map(MapDeserializer::new(items)),
-                _ => Err(de::Error::custom(format!("unsupported union: {:?}", self.input))),
+                _ => Err(de::Error::custom(format!(
+                    "unsupported union: {:?}",
+                    self.input
+                ))),
             },
             Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
             Value::Array(ref fields) => visitor.visit_seq(SeqDeserializer::new(fields)),
@@ -931,44 +934,117 @@ mod tests {
             ("time_micros_a".to_string(), 123),
             ("timestamp_millis_b".to_string(), 234),
             ("timestamp_micros_c".to_string(), 345),
-        ].iter().cloned().collect();
+        ]
+        .iter()
+        .cloned()
+        .collect();
 
-        let value_map = raw_map.iter()
+        let value_map = raw_map
+            .iter()
             .map(|(k, v)| match k {
-                key  if key.starts_with("long_") => {(k.clone(), Value::Long(*v))}
-                key  if key.starts_with("time_micros_") => {(k.clone(), Value::TimeMicros(*v))}
-                key  if key.starts_with("timestamp_millis_") => {(k.clone(), Value::TimestampMillis(*v))}
-                key  if key.starts_with("timestamp_micros_") => {(k.clone(), Value::TimestampMicros(*v))}
-                _ => unreachable!(""),
+                key if key.starts_with("long_") => (k.clone(), Value::Long(*v)),
+                key if key.starts_with("time_micros_") => (k.clone(), Value::TimeMicros(*v)),
+                key if key.starts_with("timestamp_millis_") => {
+                    (k.clone(), Value::TimestampMillis(*v))
+                }
+                key if key.starts_with("timestamp_micros_") => {
+                    (k.clone(), Value::TimestampMicros(*v))
+                }
+                _ => unreachable!("unexpected key: {:?}", k),
             })
             .collect();
 
         let record = Value::Record(vec![
-            ("a_string".to_string(), Value::String("a valid message field".to_string())),
-            ("a_non_existing_string".to_string(), Value::String("a string".to_string())),
-            ("a_union_string".to_string(), Value::Union(Box::new(Value::String("a union string".to_string())))),
-            ("a_union_long".to_string(), Value::Union(Box::new(Value::Long(412)))),
-            ("a_union_long".to_string(), Value::Union(Box::new(Value::Long(412)))),
-            ("a_time_micros".to_string(), Value::Union(Box::new(Value::TimeMicros(123)))),
-            ("a_non_existing_time_micros".to_string(), Value::Union(Box::new(Value::TimeMicros(-123)))),
-            ("a_timestamp_millis".to_string(), Value::Union(Box::new(Value::TimestampMillis(234)))),
-            ("a_non_existing_timestamp_millis".to_string(), Value::Union(Box::new(Value::TimestampMillis(-234)))),
-            ("a_timestamp_micros".to_string(), Value::Union(Box::new(Value::TimestampMicros(345)))),
-            ("a_non_existing_timestamp_micros".to_string(), Value::Union(Box::new(Value::TimestampMicros(-345)))),
-            ("a_record".to_string(), Value::Union(Box::new(Value::Record(vec!(("record_in_union".to_string(), Value::Int(-2))))))),
-            ("a_non_existing_record".to_string(), Value::Union(Box::new(Value::Record(vec!(("blah".to_string(), Value::Int(-22))))))),
-            ("an_array".to_string(), Value::Union(Box::new(Value::Array(vec!(Value::Boolean(true), Value::Boolean(false)))))),
-            ("a_non_existing_array".to_string(), Value::Union(Box::new(Value::Array(vec!(Value::Boolean(false), Value::Boolean(true)))))),
-            ("a_union_map".to_string(), Value::Union(Box::new(Value::Map(value_map)))),
-            ("a_non_existing_union_map".to_string(), Value::Union(Box::new(Value::Map(HashMap::new())))),
+            (
+                "a_string".to_string(),
+                Value::String("a valid message field".to_string()),
+            ),
+            (
+                "a_non_existing_string".to_string(),
+                Value::String("a string".to_string()),
+            ),
+            (
+                "a_union_string".to_string(),
+                Value::Union(Box::new(Value::String("a union string".to_string()))),
+            ),
+            (
+                "a_union_long".to_string(),
+                Value::Union(Box::new(Value::Long(412))),
+            ),
+            (
+                "a_union_long".to_string(),
+                Value::Union(Box::new(Value::Long(412))),
+            ),
+            (
+                "a_time_micros".to_string(),
+                Value::Union(Box::new(Value::TimeMicros(123))),
+            ),
+            (
+                "a_non_existing_time_micros".to_string(),
+                Value::Union(Box::new(Value::TimeMicros(-123))),
+            ),
+            (
+                "a_timestamp_millis".to_string(),
+                Value::Union(Box::new(Value::TimestampMillis(234))),
+            ),
+            (
+                "a_non_existing_timestamp_millis".to_string(),
+                Value::Union(Box::new(Value::TimestampMillis(-234))),
+            ),
+            (
+                "a_timestamp_micros".to_string(),
+                Value::Union(Box::new(Value::TimestampMicros(345))),
+            ),
+            (
+                "a_non_existing_timestamp_micros".to_string(),
+                Value::Union(Box::new(Value::TimestampMicros(-345))),
+            ),
+            (
+                "a_record".to_string(),
+                Value::Union(Box::new(Value::Record(vec![(
+                    "record_in_union".to_string(),
+                    Value::Int(-2),
+                )]))),
+            ),
+            (
+                "a_non_existing_record".to_string(),
+                Value::Union(Box::new(Value::Record(vec![(
+                    "blah".to_string(),
+                    Value::Int(-22),
+                )]))),
+            ),
+            (
+                "an_array".to_string(),
+                Value::Union(Box::new(Value::Array(vec![
+                    Value::Boolean(true),
+                    Value::Boolean(false),
+                ]))),
+            ),
+            (
+                "a_non_existing_array".to_string(),
+                Value::Union(Box::new(Value::Array(vec![
+                    Value::Boolean(false),
+                    Value::Boolean(true),
+                ]))),
+            ),
+            (
+                "a_union_map".to_string(),
+                Value::Union(Box::new(Value::Map(value_map))),
+            ),
+            (
+                "a_non_existing_union_map".to_string(),
+                Value::Union(Box::new(Value::Map(HashMap::new()))),
+            ),
         ]);
 
         let deserialized: StructWithMissingFields = crate::from_value(&record)?;
-        let reference = StructWithMissingFields{
+        let reference = StructWithMissingFields {
             a_string: "a valid message field".to_string(),
-            a_record: Some(RecordInUnion { record_in_union: -2 }),
+            a_record: Some(RecordInUnion {
+                record_in_union: -2,
+            }),
             an_array: Some([true, false]),
-            a_union_map: Some(raw_map)
+            a_union_map: Some(raw_map),
         };
         assert_eq!(deserialized, reference);
         Ok(())

[avro] 19/30: AVRO-3214 Rust: Support 'doc' attribute for FixedSchema (#1343)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 66f2b47edc2b550616f007fc6e41d5f66771f569
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Mon Jan 10 14:02:30 2022 +0200

    AVRO-3214 Rust: Support 'doc' attribute for FixedSchema (#1343)
    
    (cherry picked from commit 0811b827a233c415a5b48612817148b01db102ac)
---
 lang/rust/src/decode.rs               |  2 ++
 lang/rust/src/schema.rs               | 39 +++++++++++++++++++++++++++++++++--
 lang/rust/src/schema_compatibility.rs |  2 ++
 lang/rust/src/types.rs                |  4 +++-
 lang/rust/src/writer.rs               |  2 ++
 lang/rust/tests/schema.rs             |  8 ++++++-
 6 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 0714dcd..5639d28 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -298,6 +298,7 @@ mod tests {
         use num_bigint::ToBigInt;
         let inner = Box::new(Schema::Fixed {
             size: 2,
+            doc: None,
             name: Name::new("decimal"),
         });
         let schema = Schema::Decimal {
@@ -323,6 +324,7 @@ mod tests {
         let inner = Box::new(Schema::Fixed {
             size: 13,
             name: Name::new("decimal"),
+            doc: None,
         });
         let schema = Schema::Decimal {
             inner,
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index da8c0c3..3617bb1 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -109,7 +109,11 @@ pub enum Schema {
         symbols: Vec<String>,
     },
     /// A `fixed` Avro schema.
-    Fixed { name: Name, size: usize },
+    Fixed {
+        name: Name,
+        doc: Documentation,
+        size: usize,
+    },
     /// Logical type which represents `Decimal` values. The underlying type is serialized and
     /// deserialized as `Schema::Bytes` or `Schema::Fixed`.
     ///
@@ -871,6 +875,11 @@ impl Parser {
     fn parse_fixed(complex: &Map<String, Value>) -> AvroResult<Schema> {
         let name = Name::parse(complex)?;
 
+        let doc = complex.get("doc").and_then(|v| match &v {
+            Value::String(ref docstr) => Some(docstr.clone()),
+            _ => None,
+        });
+
         let size = complex
             .get("size")
             .and_then(|v| v.as_i64())
@@ -878,6 +887,7 @@ impl Parser {
 
         Ok(Schema::Fixed {
             name,
+            doc,
             size: size as usize,
         })
     }
@@ -949,10 +959,17 @@ impl Serialize for Schema {
                 map.serialize_entry("symbols", symbols)?;
                 map.end()
             }
-            Schema::Fixed { ref name, ref size } => {
+            Schema::Fixed {
+                ref name,
+                ref doc,
+                ref size,
+            } => {
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "fixed")?;
                 map.serialize_entry("name", &name.name)?;
+                if let Some(ref docstr) = doc {
+                    map.serialize_entry("doc", docstr)?;
+                }
                 map.serialize_entry("size", size)?;
                 map.end()
             }
@@ -1011,6 +1028,7 @@ impl Serialize for Schema {
                 // duration should be or typically is.
                 let inner = Schema::Fixed {
                     name: Name::new("duration"),
+                    doc: None,
                     size: 12,
                 };
                 map.serialize_entry("type", &inner)?;
@@ -1311,6 +1329,23 @@ mod tests {
 
         let expected = Schema::Fixed {
             name: Name::new("test"),
+            doc: None,
+            size: 16usize,
+        };
+
+        assert_eq!(expected, schema);
+    }
+
+    #[test]
+    fn test_fixed_schema_with_documentation() {
+        let schema = Schema::parse_str(
+            r#"{"type": "fixed", "name": "test", "size": 16, "doc": "FixedSchema documentation"}"#,
+        )
+        .unwrap();
+
+        let expected = Schema::Fixed {
+            name: Name::new("test"),
+            doc: Some(String::from("FixedSchema documentation")),
             size: 16usize,
         };
 
diff --git a/lang/rust/src/schema_compatibility.rs b/lang/rust/src/schema_compatibility.rs
index 7f2a5cf..5ba46e1 100644
--- a/lang/rust/src/schema_compatibility.rs
+++ b/lang/rust/src/schema_compatibility.rs
@@ -232,11 +232,13 @@ impl SchemaCompatibility {
                 SchemaKind::Fixed => {
                     if let Schema::Fixed {
                         name: w_name,
+                        doc: _w_doc,
                         size: w_size,
                     } = writers_schema
                     {
                         if let Schema::Fixed {
                             name: r_name,
+                            doc: _r_doc,
                             size: r_size,
                         } = readers_schema
                         {
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 63876da..75f0509 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -840,6 +840,7 @@ mod tests {
         let schema = Schema::Fixed {
             size: 4,
             name: Name::new("some_fixed"),
+            doc: None,
         };
 
         assert!(Value::Fixed(4, vec![0, 0, 0, 0]).validate(&schema));
@@ -1038,7 +1039,8 @@ mod tests {
                 scale: 1,
                 inner: Box::new(Schema::Fixed {
                     name: Name::new("decimal"),
-                    size: 20
+                    size: 20,
+                    doc: None
                 })
             })
             .is_ok());
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index efdd992..41e77c9 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -503,6 +503,7 @@ mod tests {
         let size = 30;
         let inner = Schema::Fixed {
             name: Name::new("decimal"),
+            doc: None,
             size,
         };
         let value = vec![0u8; size];
@@ -540,6 +541,7 @@ mod tests {
     fn duration() -> TestResult<()> {
         let inner = Schema::Fixed {
             name: Name::new("duration"),
+            doc: None,
             size: 12,
         };
         let value = Value::Duration(Duration::new(
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index d9fdefc..cc96429 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -336,6 +336,10 @@ const DOC_EXAMPLES: &[(&str, bool)] = &[
         r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"], "doc": "Doc String"}"#,
         true,
     ),
+    (
+        r#"{"type": "fixed", "name": "Test", "size": 1, "doc": "Fixed Doc String"}"#,
+        true,
+    ),
 ];
 
 const OTHER_ATTRIBUTES_EXAMPLES: &[(&str, bool)] = &[
@@ -1229,7 +1233,9 @@ fn test_doc_attributes() {
         match schema {
             Schema::Enum { doc, .. } => assert!(doc.is_some()),
             Schema::Record { doc, .. } => assert!(doc.is_some()),
-            _ => (),
+            Schema::Fixed { doc, .. } => assert!(doc.is_some()),
+            Schema::String => (),
+            _ => unreachable!("Unexpected schema type: {:?}", schema),
         }
     }
 

[avro] 09/30: AVRO-3197 Fallback to the 'type' when the logical type does not support the type (#1340)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 2cb26f6bfc638f741e918f4ad0a42d99b2c2e832
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 13:15:24 2022 +0200

    AVRO-3197 Fallback to the 'type' when the logical type does not support the type (#1340)
    
    * AVRO-3197 Fallback to the 'type' when the logical type does not support the type
    
    * AVRO-3197 Better formatting
    
    * AVRO-3197 Allow only when the "type" is "string"
    
    * AVRO-3197 Handle problematic complex type for date/time logical types
    
    Read the complex type recursively. It seems Avro Java may produce {"type": {"type": "string", "avro.java.string": "String"}, "logicalType": "timestamp-millis"}}, i.e. logicalType is on the same level as the outer "type"
    
    * AVRO-3197 Make Clippy happy
    
    * AVRO-3197 Log a warning when a logical type is not supported by the Schema type
    
    This is how Avro Java behaves.
    
    (cherry picked from commit bce386623c79b204957f1c37c43079d1c51c2038)
---
 lang/rust/Cargo.toml      |  2 ++
 lang/rust/src/error.rs    |  2 +-
 lang/rust/src/lib.rs      |  3 ++
 lang/rust/src/schema.rs   | 92 +++++++++++++++++++++++++++++++++++++++++------
 lang/rust/tests/schema.rs | 46 +++++++++++++++++++++---
 5 files changed, 128 insertions(+), 17 deletions(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 7d85220..3da92dd 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -69,6 +69,7 @@ typed-builder = "0.9.1"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }
 zerocopy = "0.3.0"
 lazy_static = "1.1.1"
+log = "0.4.14"
 zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }
 
 [dev-dependencies]
@@ -77,3 +78,4 @@ sha2 = "0.9.8"
 criterion = "0.3.5"
 anyhow = "1.0.44"
 hex-literal = "0.3.3"
+env_logger = "0.9.0"
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index b690bfb..f4b1fb5 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -240,7 +240,7 @@ pub enum Error {
     #[error("Must be a JSON string, object or array")]
     ParseSchemaFromValidJson,
 
-    #[error("Unknown primitiive type: {0}")]
+    #[error("Unknown primitive type: {0}")]
     ParsePrimitive(String),
 
     #[error("invalid JSON for {key:?}: {precision:?}")]
diff --git a/lang/rust/src/lib.rs b/lang/rust/src/lib.rs
index 18d9230..af6e3cf 100644
--- a/lang/rust/src/lib.rs
+++ b/lang/rust/src/lib.rs
@@ -751,6 +751,9 @@ pub use ser::to_value;
 pub use util::max_allocation_bytes;
 pub use writer::{to_avro_datum, Writer};
 
+#[macro_use]
+extern crate log;
+
 /// A convenience type alias for `Result`s with `Error`s.
 pub type AvroResult<T> = Result<T, Error>;
 
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 8e15cd4..bab99d4 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -608,18 +608,63 @@ impl Parser {
             match complex.get("type") {
                 Some(value) => {
                     let ty = parser.parse(value)?;
+
                     if kinds
                         .iter()
                         .any(|&kind| SchemaKind::from(ty.clone()) == kind)
                     {
                         Ok(ty)
                     } else {
-                        Err(Error::GetLogicalTypeVariant(value.clone()))
+                        match get_type_rec(value.clone()) {
+                            Ok(v) => Err(Error::GetLogicalTypeVariant(v)),
+                            Err(err) => Err(err),
+                        }
                     }
                 }
                 None => Err(Error::GetLogicalTypeField),
             }
         }
+
+        fn get_type_rec(json_value: Value) -> AvroResult<Value> {
+            match json_value {
+                typ @ Value::String(_) => Ok(typ),
+                Value::Object(ref complex) => match complex.get("type") {
+                    Some(v) => get_type_rec(v.clone()),
+                    None => Err(Error::GetComplexTypeField),
+                },
+                _ => Err(Error::GetComplexTypeField),
+            }
+        }
+
+        // checks whether the logicalType is supported by the type
+        fn try_logical_type(
+            logical_type: &str,
+            complex: &Map<String, Value>,
+            kinds: &[SchemaKind],
+            ok_schema: Schema,
+            parser: &mut Parser,
+        ) -> AvroResult<Schema> {
+            match logical_verify_type(complex, kinds, parser) {
+                // type and logicalType match!
+                Ok(_) => Ok(ok_schema),
+                // the logicalType is not expected for this type!
+                Err(Error::GetLogicalTypeVariant(json_value)) => match json_value {
+                    Value::String(_) => match parser.parse(&json_value) {
+                        Ok(schema) => {
+                            warn!(
+                                "Ignoring invalid logical type '{}' for schema of type: {:?}!",
+                                logical_type, schema
+                            );
+                            Ok(schema)
+                        }
+                        Err(parse_err) => Err(parse_err),
+                    },
+                    _ => Err(Error::GetLogicalTypeVariant(json_value)),
+                },
+                err => err,
+            }
+        }
+
         match complex.get("logicalType") {
             Some(&Value::String(ref t)) => match t.as_str() {
                 "decimal" => {
@@ -642,24 +687,49 @@ impl Parser {
                     return Ok(Schema::Uuid);
                 }
                 "date" => {
-                    logical_verify_type(complex, &[SchemaKind::Int], self)?;
-                    return Ok(Schema::Date);
+                    return try_logical_type(
+                        "date",
+                        complex,
+                        &[SchemaKind::Int],
+                        Schema::Date,
+                        self,
+                    );
                 }
                 "time-millis" => {
-                    logical_verify_type(complex, &[SchemaKind::Int], self)?;
-                    return Ok(Schema::TimeMillis);
+                    return try_logical_type(
+                        "time-millis",
+                        complex,
+                        &[SchemaKind::Int],
+                        Schema::TimeMillis,
+                        self,
+                    );
                 }
                 "time-micros" => {
-                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
-                    return Ok(Schema::TimeMicros);
+                    return try_logical_type(
+                        "time-micros",
+                        complex,
+                        &[SchemaKind::Long],
+                        Schema::TimeMicros,
+                        self,
+                    );
                 }
                 "timestamp-millis" => {
-                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
-                    return Ok(Schema::TimestampMillis);
+                    return try_logical_type(
+                        "timestamp-millis",
+                        complex,
+                        &[SchemaKind::Long],
+                        Schema::TimestampMillis,
+                        self,
+                    );
                 }
                 "timestamp-micros" => {
-                    logical_verify_type(complex, &[SchemaKind::Long], self)?;
-                    return Ok(Schema::TimestampMicros);
+                    return try_logical_type(
+                        "timestamp-micros",
+                        complex,
+                        &[SchemaKind::Long],
+                        Schema::TimestampMicros,
+                        self,
+                    );
                 }
                 "duration" => {
                     logical_verify_type(complex, &[SchemaKind::Fixed], self)?;
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index 14edc0f..efd9df2 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -19,6 +19,13 @@
 use avro_rs::{schema::Name, Error, Schema};
 use lazy_static::lazy_static;
 
+fn init() {
+    let _ = env_logger::builder()
+        .filter_level(log::LevelFilter::Trace)
+        .is_test(true)
+        .try_init();
+}
+
 const PRIMITIVE_EXAMPLES: &[(&str, bool)] = &[
     (r#""null""#, true),
     (r#"{"type": "null"}"#, true),
@@ -476,7 +483,8 @@ const DATE_LOGICAL_TYPE: &[(&str, bool)] = &[
     // this is valid even though its logical type is "date1", because unknown logical types are
     // ignored
     (r#"{"type": "int", "logicalType": "date1"}"#, true),
-    (r#"{"type": "long", "logicalType": "date"}"#, false),
+    // this is still valid because unknown logicalType should be ignored
+    (r#"{"type": "long", "logicalType": "date"}"#, true),
 ];
 
 const TIMEMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
@@ -484,7 +492,8 @@ const TIMEMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
     // this is valid even though its logical type is "time-milis" (missing the second "l"),
     // because unknown logical types are ignored
     (r#"{"type": "int", "logicalType": "time-milis"}"#, true),
-    (r#"{"type": "long", "logicalType": "time-millis"}"#, false),
+    // this is still valid because unknown logicalType should be ignored
+    (r#"{"type": "long", "logicalType": "time-millis"}"#, true),
 ];
 
 const TIMEMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
@@ -492,7 +501,8 @@ const TIMEMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
     // this is valid even though its logical type is "time-micro" (missing the last "s"), because
     // unknown logical types are ignored
     (r#"{"type": "long", "logicalType": "time-micro"}"#, true),
-    (r#"{"type": "int", "logicalType": "time-micros"}"#, false),
+    // this is still valid because unknown logicalType should be ignored
+    (r#"{"type": "int", "logicalType": "time-micros"}"#, true),
 ];
 
 const TIMESTAMPMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
@@ -507,8 +517,9 @@ const TIMESTAMPMILLIS_LOGICAL_TYPE: &[(&str, bool)] = &[
         true,
     ),
     (
+        // this is still valid because unknown logicalType should be ignored
         r#"{"type": "int", "logicalType": "timestamp-millis"}"#,
-        false,
+        true,
     ),
 ];
 
@@ -524,8 +535,9 @@ const TIMESTAMPMICROS_LOGICAL_TYPE: &[(&str, bool)] = &[
         true,
     ),
     (
+        // this is still valid because unknown logicalType should be ignored
         r#"{"type": "int", "logicalType": "timestamp-micros"}"#,
-        false,
+        true,
     ),
 ];
 
@@ -562,6 +574,7 @@ that recursive types are not properly supported.
 
 #[test]
 fn test_correct_recursive_extraction() {
+    init();
     let raw_outer_schema = r#"{
         "type": "record",
         "name": "X",
@@ -600,6 +613,8 @@ fn test_correct_recursive_extraction() {
 
 #[test]
 fn test_parse() {
+    init();
+
     for (raw_schema, valid) in EXAMPLES.iter() {
         let schema = Schema::parse_str(raw_schema);
         if *valid {
@@ -622,6 +637,7 @@ fn test_parse() {
 #[test]
 /// Test that the string generated by an Avro Schema object is, in fact, a valid Avro schema.
 fn test_valid_cast_to_string_after_parse() {
+    init();
     for (raw_schema, _) in VALID_EXAMPLES.iter() {
         let schema = Schema::parse_str(raw_schema).unwrap();
         Schema::parse_str(schema.canonical_form().as_str()).unwrap();
@@ -633,6 +649,7 @@ fn test_valid_cast_to_string_after_parse() {
 /// 2. Serialize "original" to a string and parse that string to generate Avro schema "round trip".
 /// 3. Ensure "original" and "round trip" schemas are equivalent.
 fn test_equivalence_after_round_trip() {
+    init();
     for (raw_schema, _) in VALID_EXAMPLES.iter() {
         let original_schema = Schema::parse_str(raw_schema).unwrap();
         let round_trip_schema =
@@ -645,6 +662,7 @@ fn test_equivalence_after_round_trip() {
 /// Test that a list of schemas whose definitions do not depend on each other produces the same
 /// result as parsing each element of the list individually
 fn test_parse_list_without_cross_deps() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -672,6 +690,7 @@ fn test_parse_list_without_cross_deps() {
 /// the schemas are input.
 /// However, the output order is guaranteed to be the same as the input order.
 fn test_parse_list_with_cross_deps_basic() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -718,6 +737,7 @@ fn test_parse_list_with_cross_deps_basic() {
 /// and returns an error. N.B. In the future, when recursive types are supported, this should be
 /// revisited.
 fn test_parse_list_recursive_type_error() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -741,6 +761,7 @@ fn test_parse_list_recursive_type_error() {
 #[test]
 /// Test that schema composition resolves namespaces.
 fn test_parse_list_with_cross_deps_and_namespaces() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -786,6 +807,7 @@ fn test_parse_list_with_cross_deps_and_namespaces() {
 #[test]
 /// Test that schema composition fails on namespace errors.
 fn test_parse_list_with_cross_deps_and_namespaces_error() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -849,6 +871,7 @@ fn permutation_indices(indices: Vec<usize>) -> Vec<Vec<usize>> {
 /// Test that a type that depends on more than one other type is parsed correctly when all
 /// definitions are passed in as a list. This should work regardless of the ordering of the list.
 fn test_parse_list_multiple_dependencies() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -913,6 +936,7 @@ fn test_parse_list_multiple_dependencies() {
 /// Test that a type that is depended on by more than one other type is parsed correctly when all
 /// definitions are passed in as a list. This should work regardless of the ordering of the list.
 fn test_parse_list_shared_dependency() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -988,6 +1012,7 @@ fn test_parse_list_shared_dependency() {
 #[test]
 /// Test that trying to parse two schemas with the same fullname returns an Error
 fn test_name_collision_error() {
+    init();
     let schema_str_1 = r#"{
         "name": "foo.A",
         "type": "record",
@@ -1010,6 +1035,7 @@ fn test_name_collision_error() {
 #[test]
 /// Test that having the same name but different fullnames does not return an error
 fn test_namespace_prevents_collisions() {
+    init();
     let schema_str_1 = r#"{
         "name": "A",
         "type": "record",
@@ -1059,6 +1085,7 @@ fn test_namespace_prevents_collisions() {
 
 #[test]
 fn test_fullname_name_and_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a", "namespace": "o.a.h", "aliases": null}"#).unwrap();
     let fullname = name.fullname(None);
@@ -1067,6 +1094,7 @@ fn test_fullname_name_and_namespace_specified() {
 
 #[test]
 fn test_fullname_fullname_and_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": "o.a.h", "aliases": null}"#)
             .unwrap();
@@ -1076,6 +1104,7 @@ fn test_fullname_fullname_and_namespace_specified() {
 
 #[test]
 fn test_fullname_name_and_default_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a", "namespace": null, "aliases": null}"#).unwrap();
     let fullname = name.fullname(Some("b.c.d"));
@@ -1084,6 +1113,7 @@ fn test_fullname_name_and_default_namespace_specified() {
 
 #[test]
 fn test_fullname_fullname_and_default_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": null, "aliases": null}"#).unwrap();
     let fullname = name.fullname(Some("o.a.h"));
@@ -1092,6 +1122,7 @@ fn test_fullname_fullname_and_default_namespace_specified() {
 
 #[test]
 fn test_fullname_fullname_namespace_and_default_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a.b.c.d", "namespace": "o.a.a", "aliases": null}"#)
             .unwrap();
@@ -1101,6 +1132,7 @@ fn test_fullname_fullname_namespace_and_default_namespace_specified() {
 
 #[test]
 fn test_fullname_name_namespace_and_default_namespace_specified() {
+    init();
     let name: Name =
         serde_json::from_str(r#"{"name": "a", "namespace": "o.a.a", "aliases": null}"#).unwrap();
     let fullname = name.fullname(Some("o.a.h"));
@@ -1109,6 +1141,8 @@ fn test_fullname_name_namespace_and_default_namespace_specified() {
 
 #[test]
 fn test_doc_attributes() {
+    init();
+
     fn assert_doc(schema: &Schema) {
         match schema {
             Schema::Enum { doc, .. } => assert!(doc.is_some()),
@@ -1164,6 +1198,8 @@ fn test_other_attributes() {
 
 #[test]
 fn test_root_error_is_not_swallowed_on_parse_error() -> Result<(), String> {
+    init();
+
     let raw_schema = r#"/not/a/real/file"#;
     let error = Schema::parse_str(raw_schema).unwrap_err();
 

[avro] 27/30: AVRO-3316 [Rust] build breaks in docker build

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit c81e1aa349bc6cf9114b69641a95f99e981b845a
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Jan 20 20:50:55 2022 +0200

    AVRO-3316 [Rust] build breaks in docker build
    
    Make the test return Unit. No need to return Ok (Result)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 9917a55d858dcd74d53176fad5341eb3f8846ae8)
---
 lang/rust/tests/io.rs | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index 2d4c7ee..4714493 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -218,7 +218,7 @@ fn test_default_value() {
 }
 
 #[test]
-fn test_no_default_value() -> Result<(), Error> {
+fn test_no_default_value() {
     let reader_schema = Schema::parse_str(
         r#"{
             "type": "record",
@@ -236,7 +236,6 @@ fn test_no_default_value() -> Result<(), Error> {
         Some(&reader_schema),
     );
     assert!(result.is_err());
-    Ok(())
 }
 
 #[test]

[avro] 17/30: Fix nightly build by optimizing the imports

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 33019d75eec9c9dd414b645a912a4fe5cd84f28a
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Mon Jan 10 09:59:26 2022 +0200

    Fix nightly build by optimizing the imports
    
    https://github.com/apache/avro/runs/4758414201?check_suite_focus=true
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit a29b8691c216f2954e063230ce957a27e30e1853)
---
 lang/rust/src/reader.rs | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 5757044..4f6d311 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -304,8 +304,7 @@ pub fn from_avro_datum<R: Read>(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::from_value;
-    use crate::{types::Record, Reader};
+    use crate::{from_value, types::Record, Reader};
     use serde::Deserialize;
     use std::io::Cursor;
 

[avro] 13/30: AVRO-3240: Fix code formatting

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 02aa8d4cbba2f5c6f8050f5e2cc92915b70c13e4
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Fri Jan 7 15:33:00 2022 +0200

    AVRO-3240: Fix code formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 00969b71fb23a15a4741b4c8e906b243db5386db)
---
 lang/rust/src/decode.rs | 89 ++++++++++++++++++++++++-------------------------
 lang/rust/src/reader.rs | 15 +++++----
 2 files changed, 53 insertions(+), 51 deletions(-)

diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 71ee4c1..0714dcd 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -23,7 +23,12 @@ use crate::{
     util::{safe_len, zag_i32, zag_i64},
     AvroResult, Error,
 };
-use std::{collections::HashMap, convert::TryFrom, io::{ErrorKind, Read}, str::FromStr};
+use std::{
+    collections::HashMap,
+    convert::TryFrom,
+    io::{ErrorKind, Read},
+    str::FromStr,
+};
 use uuid::Uuid;
 
 #[inline]
@@ -67,23 +72,20 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
         Schema::Null => Ok(Value::Null),
         Schema::Boolean => {
             let mut buf = [0u8; 1];
-            match reader
-                .read_exact(&mut buf[..]) {
-                    Ok(_) => {
-                        match buf[0] {
-                            0u8 => Ok(Value::Boolean(false)),
-                            1u8 => Ok(Value::Boolean(true)),
-                            _ => Err(Error::BoolValue(buf[0])),
-                        }
-                    },
-                    Err(io_err) => {
-                        if let ErrorKind::UnexpectedEof = io_err.kind() {
-                            Ok(Value::Null)
-                        } else {
-                            Err(Error::ReadBoolean(io_err))
-                        }
-                    },
+            match reader.read_exact(&mut buf[..]) {
+                Ok(_) => match buf[0] {
+                    0u8 => Ok(Value::Boolean(false)),
+                    1u8 => Ok(Value::Boolean(true)),
+                    _ => Err(Error::BoolValue(buf[0])),
+                },
+                Err(io_err) => {
+                    if let ErrorKind::UnexpectedEof = io_err.kind() {
+                        Ok(Value::Null)
+                    } else {
+                        Err(Error::ReadBoolean(io_err))
+                    }
                 }
+            }
         }
         Schema::Decimal { ref inner, .. } => match &**inner {
             Schema::Fixed { .. } => match decode(inner, reader)? {
@@ -135,18 +137,16 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
             let len = decode_len(reader)?;
             let mut buf = vec![0u8; len];
             match reader.read_exact(&mut buf) {
-                Ok(_) => {
-                    Ok(Value::String(
-                        String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
-                    ))
-                },
+                Ok(_) => Ok(Value::String(
+                    String::from_utf8(buf).map_err(Error::ConvertToUtf8)?,
+                )),
                 Err(io_err) => {
                     if let ErrorKind::UnexpectedEof = io_err.kind() {
                         Ok(Value::Null)
                     } else {
                         Err(Error::ReadString(io_err))
                     }
-                },
+                }
             }
         }
         Schema::Fixed { size, .. } => {
@@ -196,29 +196,28 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
 
             Ok(Value::Map(items))
         }
-        Schema::Union(ref inner) => {
-            match zag_i64(reader) {
-                Ok(index) => {
-                    let variants = inner.variants();
-                    let variant = variants
-                        .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
-                        .ok_or_else(|| Error::GetUnionVariant {
-                            index,
-                            num_variants: variants.len(),
-                        })?;
-                    let value = decode(variant, reader)?;
-                    Ok(Value::Union(Box::new(value)))
-                },
-                Err(Error::ReadVariableIntegerBytes(io_err)) => {
-                    if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Union(Box::new(Value::Null)))
-                    } else {
-                        Err(Error::ReadVariableIntegerBytes(io_err))
-                    }
-                },
-                Err(io_err) => Err(io_err),
+        Schema::Union(ref inner) => match zag_i64(reader) {
+            Ok(index) => {
+                let variants = inner.variants();
+                let variant = variants
+                    .get(usize::try_from(index).map_err(|e| Error::ConvertI64ToUsize(e, index))?)
+                    .ok_or_else(|| Error::GetUnionVariant {
+                        index,
+                        num_variants: variants.len(),
+                    })?;
+                let value = decode(variant, reader)?;
+                Ok(Value::Union(Box::new(value)))
             }
-        }
+            Err(Error::ReadVariableIntegerBytes(io_err)) => {
+                if let ErrorKind::UnexpectedEof = io_err.kind() {
+                    Ok(Value::Union(Box::new(Value::Null)))
+                } else {
+                    Err(Error::ReadVariableIntegerBytes(io_err))
+                }
+            }
+            Err(io_err) => Err(io_err),
+        },
+
         Schema::Record { ref fields, .. } => {
             // Benchmarks indicate ~10% improvement using this method.
             let mut items = Vec::with_capacity(fields.len());
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index e036991..ce6a629 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -306,8 +306,8 @@ mod tests {
     use super::*;
     use crate::from_value;
     use crate::{types::Record, Reader};
-    use std::io::Cursor;
     use serde::Deserialize;
+    use std::io::Cursor;
 
     const SCHEMA: &str = r#"
     {
@@ -376,7 +376,7 @@ mod tests {
     }
     "#;
     #[derive(Default, Debug, Deserialize, PartialEq)]
-    struct TestRecord {
+    struct TestRecord3240 {
         a: i64,
         b: String,
         a_nullable_array: Option<Vec<String>>,
@@ -406,7 +406,7 @@ mod tests {
         let schema = Schema::parse_str(TEST_RECORD_SCHEMA_3240).unwrap();
         let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
 
-        let expected_record: TestRecord = TestRecord {
+        let expected_record: TestRecord3240 = TestRecord3240 {
             a: 27i64,
             b: String::from("foo"),
             a_nullable_array: None,
@@ -414,9 +414,12 @@ mod tests {
         };
 
         let avro_datum = from_avro_datum(&schema, &mut encoded, None).unwrap();
-        let parsed_record: TestRecord = match &avro_datum {
-            Value::Record(_) => from_value::<TestRecord>(&avro_datum).unwrap(),
-            unexpected => panic!("could not map avro data to struct, found unexpected: {:?}", unexpected),
+        let parsed_record: TestRecord3240 = match &avro_datum {
+            Value::Record(_) => from_value::<TestRecord3240>(&avro_datum).unwrap(),
+            unexpected => panic!(
+                "could not map avro data to struct, found unexpected: {:?}",
+                unexpected
+            ),
         };
 
         assert_eq!(parsed_record, expected_record);

[avro] 10/30: AVRO-3216 Reuse records' schema by name (#1345)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e217c5e542f6377a73d9ca5346c3ed97a6089f48
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 13:37:26 2022 +0200

    AVRO-3216 Reuse records' schema by name (#1345)
    
    * AVRO-3197 Fallback to the 'type' when the logical type does not support the type
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3197 Allow only when the "type" is "string"
    
    * AVRO-3197 Handle problematic complex type for date/time logical types
    
    Read the complex type recursively. It seems Avro Java may produce {"type": {"type": "string", "avro.java.string": "String"}, "logicalType": "timestamp-millis"}}, i.e. logicalType is on the same level as the outer "type"
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3216 Allow to reuse record's schema by name
    
    * AVRO-3216 Extend the test case to do more assertions
    
    * AVRO-3216 Print err with Debug
    
    (cherry picked from commit b76a437b970889255703ff48f7ee5981dfbcc17a)
---
 lang/rust/src/schema.rs   | 10 ++++--
 lang/rust/tests/io.rs     |  2 +-
 lang/rust/tests/schema.rs | 86 +++++++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index bab99d4..da8c0c3 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -785,12 +785,16 @@ impl Parser {
             lookup.insert(field.name.clone(), field.position);
         }
 
-        Ok(Schema::Record {
-            name,
+        let schema = Schema::Record {
+            name: name.clone(),
             doc: complex.doc(),
             fields,
             lookup,
-        })
+        };
+
+        self.parsed_schemas
+            .insert(name.fullname(None), schema.clone());
+        Ok(schema)
     }
 
     /// Parse a `serde_json::Value` representing a Avro enum type into a
diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index 93edf0d..c1ab1d7 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -319,6 +319,6 @@ fn test_type_exception() -> Result<(), String> {
     match encoded {
         Ok(_) => Err(String::from("Expected ValidationError, got Ok")),
         Err(Error::Validation) => Ok(()),
-        Err(ref e) => Err(format!("Expected ValidationError, got {}", e)),
+        Err(ref e) => Err(format!("Expected ValidationError, got {:?}", e)),
     }
 }
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index efd9df2..d9fdefc 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Port of https://github.com/apache/avro/blob/release-1.9.1/lang/py/test/test_schema.py
-use avro_rs::{schema::Name, Error, Schema};
+use avro_rs::{
+    schema::{Name, RecordField},
+    Error, Schema,
+};
 use lazy_static::lazy_static;
 
 fn init() {
@@ -830,6 +832,86 @@ fn test_parse_list_with_cross_deps_and_namespaces_error() {
     let _ = Schema::parse_list(&schema_strs_second).expect_err("Test failed");
 }
 
+#[test]
+// <https://issues.apache.org/jira/browse/AVRO-3216>
+// test that field's RecordSchema could be referenced by a following field by full name
+fn test_parse_reused_record_schema_by_fullname() {
+    init();
+    let schema_str = r#"
+        {
+          "type" : "record",
+          "name" : "Weather",
+          "namespace" : "test",
+          "doc" : "A weather reading.",
+          "fields" : [
+            {
+                "name" : "station",
+                "type" : {
+                  "type" : "string",
+                  "avro.java.string" : "String"
+                }
+             },
+             {
+                "name" : "max_temp",
+                "type" : {
+                  "type" : "record",
+                  "name" : "Temp",
+                  "namespace": "prefix",
+                  "doc" : "A temperature reading.",
+                  "fields" : [ {
+                    "name" : "temp",
+                    "type" : "long"
+                  } ]
+                }
+            }, {
+                "name" : "min_temp",
+                "type" : "prefix.Temp"
+            }
+        ]
+       }
+    "#;
+
+    let schema = Schema::parse_str(schema_str);
+    assert!(schema.is_ok());
+    match schema.unwrap() {
+        Schema::Record {
+            ref name,
+            doc: _,
+            ref fields,
+            lookup: _,
+        } => {
+            assert_eq!(name.fullname(None), "test.Weather", "Name does not match!");
+
+            assert_eq!(fields.len(), 3, "The number of the fields is not correct!");
+
+            let RecordField {
+                ref name,
+                doc: _,
+                default: _,
+                ref schema,
+                order: _,
+                position: _,
+            } = fields.get(2).unwrap();
+
+            assert_eq!(name, "min_temp");
+
+            match schema {
+                Schema::Record {
+                    ref name,
+                    doc: _,
+                    ref fields,
+                    lookup: _,
+                } => {
+                    assert_eq!(name.fullname(None), "prefix.Temp", "Name does not match!");
+                    assert_eq!(fields.len(), 1, "The number of the fields is not correct!");
+                }
+                unexpected => unreachable!("Unexpected schema type: {:?}", unexpected),
+            }
+        }
+        unexpected => unreachable!("Unexpected schema type: {:?}", unexpected),
+    }
+}
+
 /// Return all permutations of an input slice
 fn permutations<T>(list: &[T]) -> Vec<Vec<&T>> {
     let size = list.len();

[avro] 18/30: AVRO-3284: Update Rabin fingerprint implementation to digest to 0.10+ (#1446)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 1b019814dc1a875c90e37a6edf61ca392f1567a3
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Mon Jan 10 10:00:19 2022 +0200

    AVRO-3284: Update Rabin fingerprint implementation to digest to 0.10+ (#1446)
    
    * AVRO-3284 Update Rabin fingerprint implementation to digest to 0.10+
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3284: Update sha-2 to 0.10.1
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit a4f200efcb14ab381b0d83d71e972753c1d178c1)
---
 lang/rust/Cargo.toml   |  6 +++---
 lang/rust/src/rabin.rs | 33 ++++++++++++++++++++-------------
 2 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 7709156..5fb8593 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -54,7 +54,7 @@ harness = false
 byteorder = "1.4.3"
 bzip2 = { version = "0.4.3", optional = true }
 crc32fast = { version = "1.2.1", optional = true }
-digest = "0.9"
+digest = "0.10.1"
 libflate = "1.1.1"
 num-bigint = "0.4.2"
 rand = "0.8.4"
@@ -73,8 +73,8 @@ log = "0.4.14"
 zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }
 
 [dev-dependencies]
-md-5 = "0.9.1"
-sha2 = "0.9.8"
+md-5 = "0.10.0"
+sha2 = "0.10.1"
 criterion = "0.3.5"
 anyhow = "1.0.44"
 hex-literal = "0.3.3"
diff --git a/lang/rust/src/rabin.rs b/lang/rust/src/rabin.rs
index aebaacc..d7cb773 100644
--- a/lang/rust/src/rabin.rs
+++ b/lang/rust/src/rabin.rs
@@ -17,7 +17,10 @@
 
 //! Implementation of the Rabin fingerprint algorithm
 use byteorder::{ByteOrder, LittleEndian};
-use digest::{consts::U8, generic_array::GenericArray, FixedOutput, Reset, Update};
+use digest::{
+    consts::U8, core_api::OutputSizeUser, generic_array::GenericArray, FixedOutput,
+    FixedOutputReset, HashMarker, Output, Reset, Update,
+};
 use lazy_static::lazy_static;
 
 const EMPTY: i64 = -4513414715797952619;
@@ -90,8 +93,8 @@ impl Default for Rabin {
 }
 
 impl Update for Rabin {
-    fn update(&mut self, input: impl AsRef<[u8]>) {
-        for b in input.as_ref() {
+    fn update(&mut self, data: &[u8]) {
+        for b in data {
             self.result = (self.result as u64 >> 8) as i64
                 ^ FPTABLE[((self.result ^ *b as i64) & 0xff) as usize];
         }
@@ -99,18 +102,9 @@ impl Update for Rabin {
 }
 
 impl FixedOutput for Rabin {
-    // 8-byte little-endian form of the i64
-    // See: https://avro.apache.org/docs/current/spec.html#single_object_encoding
-    type OutputSize = U8;
-
     fn finalize_into(self, out: &mut GenericArray<u8, Self::OutputSize>) {
         LittleEndian::write_i64(out, self.result);
     }
-
-    fn finalize_into_reset(&mut self, out: &mut GenericArray<u8, Self::OutputSize>) {
-        LittleEndian::write_i64(out, self.result);
-        self.result = EMPTY;
-    }
 }
 
 impl Reset for Rabin {
@@ -119,7 +113,20 @@ impl Reset for Rabin {
     }
 }
 
-digest::impl_write!(Rabin);
+impl OutputSizeUser for Rabin {
+    // 8-byte little-endian form of the i64
+    // See: https://avro.apache.org/docs/current/spec.html#single_object_encoding
+    type OutputSize = U8;
+}
+
+impl HashMarker for Rabin {}
+
+impl FixedOutputReset for Rabin {
+    fn finalize_into_reset(&mut self, out: &mut Output<Self>) {
+        LittleEndian::write_i64(out, self.result);
+        self.reset();
+    }
+}
 
 #[cfg(test)]
 mod tests {

[avro] 07/30: Update strum_macros requirement from 0.21.1 to 0.23.1 in /lang/rust (#1409)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 32e79fba3ab92f616658d939e3cdd14b18d63c3f
Author: dependabot[bot] <49...@users.noreply.github.com>
AuthorDate: Wed Jan 5 10:23:27 2022 +0200

    Update strum_macros requirement from 0.21.1 to 0.23.1 in /lang/rust (#1409)
    
    Updates the requirements on [strum_macros](https://github.com/Peternator7/strum) to permit the latest version.
    - [Release notes](https://github.com/Peternator7/strum/releases)
    - [Changelog](https://github.com/Peternator7/strum/blob/master/CHANGELOG.md)
    - [Commits](https://github.com/Peternator7/strum/commits)
    
    ---
    updated-dependencies:
    - dependency-name: strum_macros
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <su...@github.com>
    
    Co-authored-by: dependabot[bot] <49...@users.noreply.github.com>
    (cherry picked from commit c4ce231b911cb26a4b860149c542a449d717e231)
---
 lang/rust/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 8faffb3..7d85220 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -63,7 +63,7 @@ serde_json = "1.0.67"
 serde = { version = "1.0.130", features = ["derive"] }
 snap = { version = "1.0.5", optional = true }
 strum = "0.23.0"
-strum_macros = "0.21.1"
+strum_macros = "0.23.1"
 thiserror = "1.0.29"
 typed-builder = "0.9.1"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }

[avro] 30/30: AVRO-3339 Rust: Rename crate from avro-rs to apache-avro (#1488)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit ee2953e0aabc8bf67872e5006e898c502c16e4e0
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Mon Jan 31 16:06:05 2022 +0200

    AVRO-3339 Rust: Rename crate from avro-rs to apache-avro (#1488)
    
    * AVRO-3339 Rust: Rename crate from avro-rs to apache-avro
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3339 Rust: Rename crate from avro-rs to apache-avro
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 7e5bdeec02d90ac63413fad09ce5e54adc087621)
---
 lang/rust/Cargo.toml                        |  4 +-
 lang/rust/README.md                         | 68 ++++++++++++------------
 lang/rust/README.tpl                        |  4 +-
 lang/rust/benches/serde.rs                  |  2 +-
 lang/rust/benches/single.rs                 |  2 +-
 lang/rust/build.sh                          |  4 +-
 lang/rust/examples/benchmark.rs             |  2 +-
 lang/rust/examples/generate_interop_data.rs |  2 +-
 lang/rust/examples/test_interop_data.rs     |  2 +-
 lang/rust/examples/to_value.rs              |  2 +-
 lang/rust/src/lib.rs                        | 80 ++++++++++++++---------------
 lang/rust/src/rabin.rs                      |  4 +-
 lang/rust/src/reader.rs                     |  2 +-
 lang/rust/tests/io.rs                       |  2 +-
 lang/rust/tests/schema.rs                   |  2 +-
 15 files changed, 91 insertions(+), 91 deletions(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 0f230a4..dba7266 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [package]
-name = "avro-rs"
+name = "apache-avro"
 version = "0.14.0"
 authors = ["Apache Avro team <de...@avro.apache.org>"]
 description = "A library for working with Apache Avro in Rust"
@@ -26,7 +26,7 @@ repository = "https://github.com/apache/avro"
 edition = "2018"
 keywords = ["avro", "data", "serialization"]
 categories = ["encoding"]
-documentation = "https://docs.rs/avro-rs"
+documentation = "https://docs.rs/apache-avro"
 
 [features]
 snappy = ["crc32fast", "snap"]
diff --git a/lang/rust/README.md b/lang/rust/README.md
index 0282d51..4c562f7 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -17,16 +17,16 @@
   under the License.
 -->
 
-# avro-rs
+# apache-avro
 
-[![Latest Version](https://img.shields.io/crates/v/avro-rs.svg)](https://crates.io/crates/avro-rs)
+[![Latest Version](https://img.shields.io/crates/v/apache-avro.svg)](https://crates.io/crates/apache-avro)
 [![Rust Continuous Integration](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml/badge.svg)](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml)
-[![Latest Documentation](https://docs.rs/avro-rs/badge.svg)](https://docs.rs/avro-rs)
+[![Latest Documentation](https://docs.rs/apache-avro/badge.svg)](https://docs.rs/apache-avro)
 [![Apache License 2.0](https://img.shields.io/badge/license-Apache%202-blue.svg](https://github.com/apache/avro/blob/master/LICENSE.txt)
 
 A library for working with [Apache Avro](https://avro.apache.org/) in Rust.
 
-Please check our [documentation](https://docs.rs/avro-rs) for examples, tutorials and API reference.
+Please check our [documentation](https://docs.rs/apache-avro) for examples, tutorials and API reference.
 
 **[Apache Avro](https://avro.apache.org/)** is a data serialization system which provides rich
 data structures and a compact, fast, binary data format.
@@ -50,7 +50,7 @@ There are basically two ways of handling Avro data in Rust:
 * **as generic Rust serde-compatible types** implementing/deriving `Serialize` and
 `Deserialize`;
 
-**avro-rs** provides a way to read and write both these data representations easily and
+**apache-avro** provides a way to read and write both these data representations easily and
 efficiently.
 
 ## Installing the library
@@ -60,13 +60,13 @@ Add to your `Cargo.toml`:
 
 ```toml
 [dependencies]
-avro-rs = "x.y"
+apache-avro = "x.y"
 ```
 
 Or in case you want to leverage the **Snappy** codec:
 
 ```toml
-[dependencies.avro-rs]
+[dependencies.apache-avro]
 version = "x.y"
 features = ["snappy"]
 ```
@@ -74,7 +74,7 @@ features = ["snappy"]
 Or in case you want to leverage the **Zstandard** codec:
 
 ```toml
-[dependencies.avro-rs]
+[dependencies.apache-avro]
 version = "x.y"
 features = ["zstandard"]
 ```
@@ -82,7 +82,7 @@ features = ["zstandard"]
 Or in case you want to leverage the **Bzip2** codec:
 
 ```toml
-[dependencies.avro-rs]
+[dependencies.apache-avro]
 version = "x.y"
 features = ["bzip"]
 ```
@@ -90,7 +90,7 @@ features = ["bzip"]
 Or in case you want to leverage the **Xz** codec:
 
 ```toml
-[dependencies.avro-rs]
+[dependencies.apache-avro]
 version = "x.y"
 features = ["xz"]
 ```
@@ -110,7 +110,7 @@ handling. Avro schemas are used for both schema validation and resolution of Avr
 Avro schemas are defined in **JSON** format and can just be parsed out of a raw string:
 
 ```rust
-use avro_rs::Schema;
+use apache_avro::Schema;
 
 let raw_schema = r#"
     {
@@ -134,7 +134,7 @@ Additionally, a list of of definitions (which may depend on each other) can be g
 them will be parsed into the corresponding schemas.
 
 ```rust
-use avro_rs::Schema;
+use apache_avro::Schema;
 
 let raw_schema_1 = r#"{
         "name": "A",
@@ -187,8 +187,8 @@ Given that the schema we defined above is that of an Avro *Record*, we are going
 associated type provided by the library to specify the data we want to serialize:
 
 ```rust
-use avro_rs::types::Record;
-use avro_rs::Writer;
+use apache_avro::types::Record;
+use apache_avro::Writer;
 #
 // a writer needs a schema and something to write to
 let mut writer = Writer::new(&schema, Vec::new());
@@ -213,7 +213,7 @@ case we want to directly define an Avro value, the library offers that capabilit
 `Value` interface.
 
 ```rust
-use avro_rs::types::Value;
+use apache_avro::types::Value;
 
 let mut value = Value::String("foo".to_string());
 ```
@@ -224,7 +224,7 @@ Given that the schema we defined above is an Avro *Record*, we can directly use
 deriving `Serialize` to model our data:
 
 ```rust
-use avro_rs::Writer;
+use apache_avro::Writer;
 
 #[derive(Debug, Serialize)]
 struct Test {
@@ -279,8 +279,8 @@ You must enable the `bzip` feature to use this codec.
 
 To specify a codec to use to compress data, just specify it while creating a `Writer`:
 ```rust
-use avro_rs::Writer;
-use avro_rs::Codec;
+use apache_avro::Writer;
+use apache_avro::Codec;
 #
 let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
 ```
@@ -292,7 +292,7 @@ read them. The library will do it automatically for us, as it already does for t
 codec:
 
 ```rust
-use avro_rs::Reader;
+use apache_avro::Reader;
 #
 // reader creation can fail in case the input to read from is not Avro-compatible or malformed
 let reader = Reader::new(&input[..]).unwrap();
@@ -301,8 +301,8 @@ let reader = Reader::new(&input[..]).unwrap();
 In case, instead, we want to specify a different (but compatible) reader schema from the schema
 the data has been written with, we can just do as the following:
 ```rust
-use avro_rs::Schema;
-use avro_rs::Reader;
+use apache_avro::Schema;
+use apache_avro::Reader;
 #
 
 let reader_raw_schema = r#"
@@ -341,7 +341,7 @@ interested.
 We can just read directly instances of `Value` out of the `Reader` iterator:
 
 ```rust
-use avro_rs::Reader;
+use apache_avro::Reader;
 #
 let reader = Reader::new(&input[..]).unwrap();
 
@@ -358,8 +358,8 @@ Alternatively, we can use a Rust type implementing `Deserialize` and representin
 read the data into:
 
 ```rust
-use avro_rs::Reader;
-use avro_rs::from_value;
+use apache_avro::Reader;
+use apache_avro::from_value;
 
 #[derive(Debug, Deserialize)]
 struct Test {
@@ -381,7 +381,7 @@ The following is an example of how to combine everything showed so far and it is
 quick reference of the library interface:
 
 ```rust
-use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
+use apache_avro::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Deserialize, Serialize)]
@@ -431,7 +431,7 @@ fn main() -> Result<(), Error> {
 }
 ```
 
-`avro-rs` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
+`apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
 
 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
 1. UUID using the [`uuid`](https://docs.rs/uuid/0.8.1/uuid) crate
@@ -444,7 +444,7 @@ Note that the on-disk representation is identical to the underlying primitive/co
 #### Read and write logical types
 
 ```rust
-use avro_rs::{
+use apache_avro::{
     types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
     Writer, Error,
 };
@@ -557,8 +557,8 @@ This library supports calculating the following fingerprints:
 An example of fingerprinting for the supported fingerprints:
 
 ```rust
-use avro_rs::rabin::Rabin;
-use avro_rs::{Schema, Error};
+use apache_avro::rabin::Rabin;
+use apache_avro::{Schema, Error};
 use md5::Md5;
 use sha2::Sha256;
 
@@ -590,7 +590,7 @@ If encoded data passed to a `Reader` has been ill-formed, it can happen that
 the bytes meant to contain the length of data are bogus and could result
 in extravagant memory allocation.
 
-To shield users from ill-formed data, `avro-rs` sets a limit (default: 512MB)
+To shield users from ill-formed data, `apache-avro` sets a limit (default: 512MB)
 to any allocation it will perform when decoding data.
 
 If you expect some of your data fields to be larger than this limit, be sure
@@ -602,7 +602,7 @@ will be 512MB throughout the lifetime of the program).
 
 
 ```rust
-use avro_rs::max_allocation_bytes;
+use apache_avro::max_allocation_bytes;
 
 max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB
 
@@ -615,7 +615,7 @@ max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB
 This library supports checking for schemas compatibility.
 
 Note: It does not yet support named schemas (more on
-https://github.com/flavray/avro-rs/pull/76).
+https://github.com/flavray/apache-avro/pull/76).
 
 Examples of checking for compatibility:
 
@@ -625,7 +625,7 @@ Explanation: an int array schema can be read by a long array schema- an int
 (32bit signed integer) fits into a long (64bit signed integer)
 
 ```rust
-use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
+use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
 
 let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
 let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
@@ -638,7 +638,7 @@ Explanation: a long array schema cannot be read by an int array schema- a
 long (64bit signed integer) does not fit into an int (32bit signed integer)
 
 ```rust
-use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
+use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
 
 let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
 let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
diff --git a/lang/rust/README.tpl b/lang/rust/README.tpl
index 88830d1..a1184ac 100644
--- a/lang/rust/README.tpl
+++ b/lang/rust/README.tpl
@@ -1,8 +1,8 @@
 # {{crate}}
 
-[![Latest Version](https://img.shields.io/crates/v/avro-rs.svg)](https://crates.io/crates/avro-rs)
+[![Latest Version](https://img.shields.io/crates/v/apache-avro.svg)](https://crates.io/crates/apache-avro)
 [![Rust Continuous Integration](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml/badge.svg)](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml)
-[![Latest Documentation](https://docs.rs/avro-rs/badge.svg)](https://docs.rs/avro-rs)
+[![Latest Documentation](https://docs.rs/apache-avro/badge.svg)](https://docs.rs/apache-avro)
 [![Apache License 2.0](https://img.shields.io/badge/license-Apache%202-blue.svg](https://github.com/apache/avro/blob/master/LICENSE.txt)
 
 {{readme}}
diff --git a/lang/rust/benches/serde.rs b/lang/rust/benches/serde.rs
index 2d74dea..e48fffb 100644
--- a/lang/rust/benches/serde.rs
+++ b/lang/rust/benches/serde.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::{
+use apache_avro::{
     schema::Schema,
     types::{Record, Value},
     Reader, Writer,
diff --git a/lang/rust/benches/single.rs b/lang/rust/benches/single.rs
index 314f20b..3556adb 100644
--- a/lang/rust/benches/single.rs
+++ b/lang/rust/benches/single.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::{
+use apache_avro::{
     schema::Schema,
     to_avro_datum,
     types::{Record, Value},
diff --git a/lang/rust/build.sh b/lang/rust/build.sh
index 2f0a824..7b78acd 100755
--- a/lang/rust/build.sh
+++ b/lang/rust/build.sh
@@ -53,11 +53,11 @@ do
       cargo build --release --lib --all-features
       cargo package
       mkdir -p  ../../dist/rust
-      cp target/package/avro-rs-*.crate $dist_dir
+      cp target/package/apache-avro-*.crate $dist_dir
       ;;
     interop-data-generate)
       prepare_build
-      export RUST_LOG=avro_rs=debug
+      export RUST_LOG=apache_avro=debug
       export RUST_BACKTRACE=1
       cargo run --all-features --example generate_interop_data
       ;;
diff --git a/lang/rust/examples/benchmark.rs b/lang/rust/examples/benchmark.rs
index 9ec23da..9728ead 100644
--- a/lang/rust/examples/benchmark.rs
+++ b/lang/rust/examples/benchmark.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::{
+use apache_avro::{
     schema::Schema,
     types::{Record, Value},
     Reader, Writer,
diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
index 211c9cb..514ee77 100644
--- a/lang/rust/examples/generate_interop_data.rs
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::{
+use apache_avro::{
     schema::Schema,
     types::{Record, Value},
     Codec, Writer,
diff --git a/lang/rust/examples/test_interop_data.rs b/lang/rust/examples/test_interop_data.rs
index f86c6c4..e04020e 100644
--- a/lang/rust/examples/test_interop_data.rs
+++ b/lang/rust/examples/test_interop_data.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::Reader;
+use apache_avro::Reader;
 use std::ffi::OsStr;
 
 fn main() -> anyhow::Result<()> {
diff --git a/lang/rust/examples/to_value.rs b/lang/rust/examples/to_value.rs
index 622554b..69cbe38 100644
--- a/lang/rust/examples/to_value.rs
+++ b/lang/rust/examples/to_value.rs
@@ -23,7 +23,7 @@ struct Test {
 
 fn main() -> anyhow::Result<()> {
     let test = Test { a: 27, b: "foo" };
-    let value = avro_rs::to_value(test)?;
+    let value = apache_avro::to_value(test)?;
     println!("{:?}", value);
     Ok(())
 }
diff --git a/lang/rust/src/lib.rs b/lang/rust/src/lib.rs
index af6e3cf..3b140ce 100644
--- a/lang/rust/src/lib.rs
+++ b/lang/rust/src/lib.rs
@@ -17,7 +17,7 @@
 
 //! A library for working with [Apache Avro](https://avro.apache.org/) in Rust.
 //!
-//! Please check our [documentation](https://docs.rs/avro-rs) for examples, tutorials and API reference.
+//! Please check our [documentation](https://docs.rs/apache-avro) for examples, tutorials and API reference.
 //!
 //! **[Apache Avro](https://avro.apache.org/)** is a data serialization system which provides rich
 //! data structures and a compact, fast, binary data format.
@@ -41,7 +41,7 @@
 //! * **as generic Rust serde-compatible types** implementing/deriving `Serialize` and
 //! `Deserialize`;
 //!
-//! **avro-rs** provides a way to read and write both these data representations easily and
+//! **apache-avro** provides a way to read and write both these data representations easily and
 //! efficiently.
 //!
 //! # Installing the library
@@ -51,13 +51,13 @@
 //!
 //! ```toml
 //! [dependencies]
-//! avro-rs = "x.y"
+//! apache-avro = "x.y"
 //! ```
 //!
 //! Or in case you want to leverage the **Snappy** codec:
 //!
 //! ```toml
-//! [dependencies.avro-rs]
+//! [dependencies.apache-avro]
 //! version = "x.y"
 //! features = ["snappy"]
 //! ```
@@ -76,7 +76,7 @@
 //! Avro schemas are defined in **JSON** format and can just be parsed out of a raw string:
 //!
 //! ```
-//! use avro_rs::Schema;
+//! use apache_avro::Schema;
 //!
 //! let raw_schema = r#"
 //!     {
@@ -100,7 +100,7 @@
 //! them will be parsed into the corresponding schemas.
 //!
 //! ```
-//! use avro_rs::Schema;
+//! use apache_avro::Schema;
 //!
 //! let raw_schema_1 = r#"{
 //!         "name": "A",
@@ -153,9 +153,9 @@
 //! associated type provided by the library to specify the data we want to serialize:
 //!
 //! ```
-//! # use avro_rs::Schema;
-//! use avro_rs::types::Record;
-//! use avro_rs::Writer;
+//! # use apache_avro::Schema;
+//! use apache_avro::types::Record;
+//! use apache_avro::Writer;
 //! #
 //! # let raw_schema = r#"
 //! #     {
@@ -191,7 +191,7 @@
 //! `Value` interface.
 //!
 //! ```
-//! use avro_rs::types::Value;
+//! use apache_avro::types::Value;
 //!
 //! let mut value = Value::String("foo".to_string());
 //! ```
@@ -202,9 +202,9 @@
 //! deriving `Serialize` to model our data:
 //!
 //! ```
-//! # use avro_rs::Schema;
+//! # use apache_avro::Schema;
 //! # use serde::Serialize;
-//! use avro_rs::Writer;
+//! use apache_avro::Writer;
 //!
 //! #[derive(Debug, Serialize)]
 //! struct Test {
@@ -263,9 +263,9 @@
 //!
 //! To specify a codec to use to compress data, just specify it while creating a `Writer`:
 //! ```
-//! # use avro_rs::Schema;
-//! use avro_rs::Writer;
-//! use avro_rs::Codec;
+//! # use apache_avro::Schema;
+//! use apache_avro::Writer;
+//! use apache_avro::Codec;
 //! #
 //! # let raw_schema = r#"
 //! #     {
@@ -288,10 +288,10 @@
 //! codec:
 //!
 //! ```
-//! use avro_rs::Reader;
-//! # use avro_rs::Schema;
-//! # use avro_rs::types::Record;
-//! # use avro_rs::Writer;
+//! use apache_avro::Reader;
+//! # use apache_avro::Schema;
+//! # use apache_avro::types::Record;
+//! # use apache_avro::Writer;
 //! #
 //! # let raw_schema = r#"
 //! #     {
@@ -317,10 +317,10 @@
 //! In case, instead, we want to specify a different (but compatible) reader schema from the schema
 //! the data has been written with, we can just do as the following:
 //! ```
-//! use avro_rs::Schema;
-//! use avro_rs::Reader;
-//! # use avro_rs::types::Record;
-//! # use avro_rs::Writer;
+//! use apache_avro::Schema;
+//! use apache_avro::Reader;
+//! # use apache_avro::types::Record;
+//! # use apache_avro::Writer;
 //! #
 //! # let writer_raw_schema = r#"
 //! #     {
@@ -376,10 +376,10 @@
 //! We can just read directly instances of `Value` out of the `Reader` iterator:
 //!
 //! ```
-//! # use avro_rs::Schema;
-//! # use avro_rs::types::Record;
-//! # use avro_rs::Writer;
-//! use avro_rs::Reader;
+//! # use apache_avro::Schema;
+//! # use apache_avro::types::Record;
+//! # use apache_avro::Writer;
+//! use apache_avro::Reader;
 //! #
 //! # let raw_schema = r#"
 //! #     {
@@ -414,11 +414,11 @@
 //! read the data into:
 //!
 //! ```
-//! # use avro_rs::Schema;
-//! # use avro_rs::Writer;
+//! # use apache_avro::Schema;
+//! # use apache_avro::Writer;
 //! # use serde::{Deserialize, Serialize};
-//! use avro_rs::Reader;
-//! use avro_rs::from_value;
+//! use apache_avro::Reader;
+//! use apache_avro::from_value;
 //!
 //! # #[derive(Serialize)]
 //! #[derive(Debug, Deserialize)]
@@ -459,7 +459,7 @@
 //! quick reference of the library interface:
 //!
 //! ```
-//! use avro_rs::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
+//! use apache_avro::{Codec, Reader, Schema, Writer, from_value, types::Record, Error};
 //! use serde::{Deserialize, Serialize};
 //!
 //! #[derive(Debug, Deserialize, Serialize)]
@@ -509,7 +509,7 @@
 //! }
 //! ```
 //!
-//! `avro-rs` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
+//! `apache-avro` also supports the logical types listed in the [Avro specification](https://avro.apache.org/docs/current/spec.html#Logical+Types):
 //!
 //! 1. `Decimal` using the [`num_bigint`](https://docs.rs/num-bigint/0.2.6/num_bigint) crate
 //! 1. UUID using the [`uuid`](https://docs.rs/uuid/0.8.1/uuid) crate
@@ -522,7 +522,7 @@
 //! ### Read and write logical types
 //!
 //! ```rust
-//! use avro_rs::{
+//! use apache_avro::{
 //!     types::Record, types::Value, Codec, Days, Decimal, Duration, Millis, Months, Reader, Schema,
 //!     Writer, Error,
 //! };
@@ -635,8 +635,8 @@
 //! An example of fingerprinting for the supported fingerprints:
 //!
 //! ```rust
-//! use avro_rs::rabin::Rabin;
-//! use avro_rs::{Schema, Error};
+//! use apache_avro::rabin::Rabin;
+//! use apache_avro::{Schema, Error};
 //! use md5::Md5;
 //! use sha2::Sha256;
 //!
@@ -668,7 +668,7 @@
 //! the bytes meant to contain the length of data are bogus and could result
 //! in extravagant memory allocation.
 //!
-//! To shield users from ill-formed data, `avro-rs` sets a limit (default: 512MB)
+//! To shield users from ill-formed data, `apache-avro` sets a limit (default: 512MB)
 //! to any allocation it will perform when decoding data.
 //!
 //! If you expect some of your data fields to be larger than this limit, be sure
@@ -680,7 +680,7 @@
 //!
 //!
 //! ```rust
-//! use avro_rs::max_allocation_bytes;
+//! use apache_avro::max_allocation_bytes;
 //!
 //! max_allocation_bytes(2 * 1024 * 1024 * 1024);  // 2GB
 //!
@@ -703,7 +703,7 @@
 //! (32bit signed integer) fits into a long (64bit signed integer)
 //!
 //! ```rust
-//! use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
+//! use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
 //!
 //! let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
 //! let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
@@ -716,7 +716,7 @@
 //! long (64bit signed integer) does not fit into an int (32bit signed integer)
 //!
 //! ```rust
-//! use avro_rs::{Schema, schema_compatibility::SchemaCompatibility};
+//! use apache_avro::{Schema, schema_compatibility::SchemaCompatibility};
 //!
 //! let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap();
 //! let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap();
diff --git a/lang/rust/src/rabin.rs b/lang/rust/src/rabin.rs
index d7cb773..e1ede43 100644
--- a/lang/rust/src/rabin.rs
+++ b/lang/rust/src/rabin.rs
@@ -45,7 +45,7 @@ lazy_static! {
 /// This is what is used for avro [single object encoding](https://avro.apache.org/docs/current/spec.html#single_object_encoding)
 ///
 /// ```rust
-/// use avro_rs::rabin::Rabin;
+/// use apache_avro::rabin::Rabin;
 /// use digest::Digest;
 /// use hex_literal::hex;
 ///
@@ -64,7 +64,7 @@ lazy_static! {
 /// To convert the digest to the commonly used 64-bit integer value, you can use the byteorder crate:
 ///
 /// ```rust
-/// # use avro_rs::rabin::Rabin;
+/// # use apache_avro::rabin::Rabin;
 /// # use digest::Digest;
 /// # use hex_literal::hex;
 ///
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 9634a27..d46b3bd 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -191,7 +191,7 @@ impl<R: Read> Block<R> {
 /// To be used as an iterator:
 ///
 /// ```no_run
-/// # use avro_rs::Reader;
+/// # use apache_avro::Reader;
 /// # use std::io::Cursor;
 /// # let input = Cursor::new(Vec::<u8>::new());
 /// for value in Reader::new(input).unwrap() {
diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index 4714493..18b3c70 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 //! Port of https://github.com/apache/avro/blob/release-1.9.1/lang/py/test/test_io.py
-use avro_rs::{from_avro_datum, to_avro_datum, types::Value, Error, Schema};
+use apache_avro::{from_avro_datum, to_avro_datum, types::Value, Error, Schema};
 use lazy_static::lazy_static;
 use std::io::Cursor;
 
diff --git a/lang/rust/tests/schema.rs b/lang/rust/tests/schema.rs
index 77b6569..d7ff3e4 100644
--- a/lang/rust/tests/schema.rs
+++ b/lang/rust/tests/schema.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use avro_rs::{
+use apache_avro::{
     schema::{Name, RecordField},
     types::{Record, Value},
     Codec, Error, Reader, Schema, Writer,

[avro] 01/30: AVRO-3205 Rust: Update Cargo.toml [package] information (#1344)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 81812a0b6c9426647b2827a02b7d33a5ba340480
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 07:48:08 2022 +0200

    AVRO-3205 Rust: Update Cargo.toml [package] information (#1344)
    
    * AVRO-3205 Update Cargo.toml [package] information
    
    * AVRO-3205 Fix 'cargo doc` warning
    
    Documenting avro-rs v0.14.0 (/home/martin/git/apache/avro/lang/rust)
    warning: this URL is not a hyperlink
       --> src/lib.rs:696:5
        |
    696 | //! https://github.com/flavray/avro-rs/pull/76).
        |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: use an automatic link instead: `<https://github.com/flavray/avro-rs/pull/76>`
        |
        = note: `#[warn(rustdoc::bare_urls)]` on by default
        = note: bare URLs are not automatically turned into clickable links
    
    * AVRO-3205 Add keywords and documentation to [package]
    
    * AVRO-3205 Add categories = ["encoding"]
    
    * AVRO-3205 Update markdown files
    
    Replace flavray/avro-rs with apache/avro.
    Replace MIT license with Apache License 2
    
    (cherry picked from commit ea07ac016fef769e3e80645b32ca72e0a0d13a26)
---
 lang/rust/CHANGELOG.md |  5 ++++-
 lang/rust/Cargo.toml   | 13 ++++++++-----
 lang/rust/README.md    | 11 +++++------
 lang/rust/README.tpl   | 11 +++++------
 lang/rust/src/lib.rs   |  2 +-
 5 files changed, 23 insertions(+), 19 deletions(-)

diff --git a/lang/rust/CHANGELOG.md b/lang/rust/CHANGELOG.md
index 0b1943a..47b959e 100644
--- a/lang/rust/CHANGELOG.md
+++ b/lang/rust/CHANGELOG.md
@@ -18,7 +18,10 @@
 -->
 
 # Changelog
-All notable changes to this project will be documented in this file.
+
+This file has been used by [avro-rs](https://github.com/flavray/avro-rs) before donating the project to Apache Avro.
+Apache Avro uses [JIRA](https://issues.apache.org/jira/browse/AVRO) for issue tracking and changelog!
+
 
 The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index d0be3d0..d82e5ff 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -17,13 +17,16 @@
 
 [package]
 name = "avro-rs"
-version = "0.13.0"
-authors = ["Flavien Raynaud <fl...@gmail.com>", "Antonio Verardi <an...@gmail.com>"]
-description = "Library for working with Apache Avro in Rust"
-license = "MIT"
+version = "0.14.0"
+authors = ["Apache Avro team <de...@avro.apache.org>"]
+description = "A library for working with Apache Avro in Rust"
+license = "Apache-2.0"
 readme = "README.md"
-repository = "https://github.com/flavray/avro-rs"
+repository = "https://github.com/apache/avro"
 edition = "2018"
+keywords = ["avro", "data", "serialization"]
+categories = ["encoding"]
+documentation = "https://docs.rs/avro-rs"
 
 [features]
 snappy = ["crc", "snap"]
diff --git a/lang/rust/README.md b/lang/rust/README.md
index 7ac542a..e934cc3 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -20,9 +20,9 @@
 # avro-rs
 
 [![Latest Version](https://img.shields.io/crates/v/avro-rs.svg)](https://crates.io/crates/avro-rs)
-[![Continuous Integration](https://github.com/flavray/avro-rs/workflows/Continuous%20Integration/badge.svg)](https://github.com/flavray/avro-rs/actions)
+[![Rust Continuous Integration](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml/badge.svg)](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml)
 [![Latest Documentation](https://docs.rs/avro-rs/badge.svg)](https://docs.rs/avro-rs)
-[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/flavray/avro-rs/blob/main/LICENSE)
+[![Apache License 2.0](https://img.shields.io/badge/license-Apache%202-blue.svg](https://github.com/apache/avro/blob/master/LICENSE.txt)
 
 A library for working with [Apache Avro](https://avro.apache.org/) in Rust.
 
@@ -614,13 +614,12 @@ assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema
 ```
 
 ## License
-This project is licensed under [MIT License](https://github.com/flavray/avro-rs/blob/main/LICENSE).
-Please note that this is not an official project maintained by [Apache Avro](https://avro.apache.org/).
+This project is licensed under [Apache License 2.0](https://github.com/apache/avro/blob/master/LICENSE.txt).
 
 ## Contributing
 Everyone is encouraged to contribute! You can contribute by forking the GitHub repo and making a pull request or opening an issue.
-All contributions will be licensed under [MIT License](https://github.com/flavray/avro-rs/blob/main/LICENSE).
+All contributions will be licensed under [Apache License 2.0](https://github.com/apache/avro/blob/master/LICENSE.txt).
 
-Please consider adding documentation, tests and a line for your change under the Unreleased section in the [CHANGELOG](https://github.com/flavray/avro-rs/blob/main/CHANGELOG.md).
+Please consider adding documentation and tests!
 If you introduce a backward-incompatible change, please consider adding instruction to migrate in the [Migration Guide](migration_guide.md)
 If you modify the crate documentation in `lib.rs`, run `make readme` to sync the README file.
diff --git a/lang/rust/README.tpl b/lang/rust/README.tpl
index daab35c..88830d1 100644
--- a/lang/rust/README.tpl
+++ b/lang/rust/README.tpl
@@ -1,20 +1,19 @@
 # {{crate}}
 
 [![Latest Version](https://img.shields.io/crates/v/avro-rs.svg)](https://crates.io/crates/avro-rs)
-[![Continuous Integration](https://github.com/flavray/avro-rs/workflows/Continuous%20Integration/badge.svg)](https://github.com/flavray/avro-rs/actions)
+[![Rust Continuous Integration](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml/badge.svg)](https://github.com/apache/avro/actions/workflows/test-lang-rust-ci.yml)
 [![Latest Documentation](https://docs.rs/avro-rs/badge.svg)](https://docs.rs/avro-rs)
-[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/flavray/avro-rs/blob/main/LICENSE)
+[![Apache License 2.0](https://img.shields.io/badge/license-Apache%202-blue.svg](https://github.com/apache/avro/blob/master/LICENSE.txt)
 
 {{readme}}
 
 ## License
-This project is licensed under [MIT License](https://github.com/flavray/avro-rs/blob/main/LICENSE).
-Please note that this is not an official project maintained by [Apache Avro](https://avro.apache.org/).
+This project is licensed under [Apache License 2.0](https://github.com/apache/avro/blob/master/LICENSE.txt).
 
 ## Contributing
 Everyone is encouraged to contribute! You can contribute by forking the GitHub repo and making a pull request or opening an issue.
-All contributions will be licensed under [MIT License](https://github.com/flavray/avro-rs/blob/main/LICENSE).
+All contributions will be licensed under [Apache License 2.0](https://github.com/apache/avro/blob/master/LICENSE.txt).
 
-Please consider adding documentation, tests and a line for your change under the Unreleased section in the [CHANGELOG](https://github.com/flavray/avro-rs/blob/main/CHANGELOG.md).
+Please consider adding documentation and tests!
 If you introduce a backward-incompatible change, please consider adding instruction to migrate in the [Migration Guide](migration_guide.md)
 If you modify the crate documentation in `lib.rs`, run `make readme` to sync the README file.
diff --git a/lang/rust/src/lib.rs b/lang/rust/src/lib.rs
index 21530d9..ca14939 100644
--- a/lang/rust/src/lib.rs
+++ b/lang/rust/src/lib.rs
@@ -693,7 +693,7 @@
 //! This library supports checking for schemas compatibility.
 //!
 //! Note: It does not yet support named schemas (more on
-//! https://github.com/flavray/avro-rs/pull/76).
+//! <https://github.com/flavray/avro-rs/pull/76>).
 //!
 //! Examples of checking for compatibility:
 //!

[avro] 05/30: AVRO-3245 Rust: Replace crc crate with crc32fast (#1388)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 76d1cc34cfd8196c2eadeb6b26339e6ce332d996
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 09:45:25 2022 +0200

    AVRO-3245 Rust: Replace crc crate with crc32fast (#1388)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 54f37e52cb373904c7529a74dee1275bffac457a)
---
 lang/rust/Cargo.toml   |  4 ++--
 lang/rust/src/codec.rs | 14 +++++++++++---
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 4501bf0..328e858 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -29,7 +29,7 @@ categories = ["encoding"]
 documentation = "https://docs.rs/avro-rs"
 
 [features]
-snappy = ["crc", "snap"]
+snappy = ["crc32fast", "snap"]
 zstandard = ["zstd"]
 bzip = ["bzip2"]
 
@@ -53,7 +53,7 @@ harness = false
 [dependencies]
 byteorder = "1.4.3"
 bzip2 = { version = "0.4.3", optional = true }
-crc = { version = "1.8.1", optional = true }
+crc32fast = { version = "1.2.1", optional = true }
 digest = "0.9"
 libflate = "1.1.1"
 num-bigint = "0.4.2"
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 84f0381..15992c1 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -26,6 +26,10 @@ use bzip2::{
     read::{BzDecoder, BzEncoder},
     Compression,
 };
+#[cfg(feature = "snappy")]
+extern crate crc32fast;
+#[cfg(feature = "snappy")]
+use crc32fast::Hasher;
 
 /// The compression codec used to compress blocks.
 #[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
@@ -79,8 +83,10 @@ impl Codec {
                     .compress(&stream[..], &mut encoded[..])
                     .map_err(Error::SnappyCompress)?;
 
-                let crc = crc::crc32::checksum_ieee(&stream[..]);
-                byteorder::BigEndian::write_u32(&mut encoded[compressed_size..], crc);
+                let mut hasher = Hasher::new();
+                hasher.update(&stream[..]);
+                let checksum = hasher.finalize();
+                byteorder::BigEndian::write_u32(&mut encoded[compressed_size..], checksum);
                 encoded.truncate(compressed_size + 4);
 
                 *stream = encoded;
@@ -127,7 +133,9 @@ impl Codec {
                     .map_err(Error::SnappyDecompress)?;
 
                 let expected = byteorder::BigEndian::read_u32(&stream[stream.len() - 4..]);
-                let actual = crc::crc32::checksum_ieee(&decoded);
+                let mut hasher = Hasher::new();
+                hasher.update(&decoded);
+                let actual = hasher.finalize();
 
                 if expected != actual {
                     return Err(Error::SnappyCrc32 { expected, actual });

[avro] 04/30: AVRO-3246 Rust: Add new codec: bzip2 (#1389)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit cd53fadd0229eff796faf4ba7d970a45cd7baf34
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Jan 5 09:32:42 2022 +0200

    AVRO-3246 Rust: Add new codec: bzip2 (#1389)
    
    * AVRO-3246 Rust: Add new codec: bzip2
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3246 Rust: Add new codec: bzip2
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3246 Fix an error reported by Rust nightly
    
    https://github.com/apache/avro/runs/4081593652?check_suite_focus=true
    (cherry picked from commit 9da79b68f4e37d2910d48e42dd43e559770fd38b)
---
 lang/rust/Cargo.toml   |  2 ++
 lang/rust/README.md    | 10 ++++++++++
 lang/rust/src/codec.rs | 42 ++++++++++++++++++++++++++++++++++++++++++
 lang/rust/src/lib.rs   |  2 +-
 4 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 23ddc2a..4501bf0 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -31,6 +31,7 @@ documentation = "https://docs.rs/avro-rs"
 [features]
 snappy = ["crc", "snap"]
 zstandard = ["zstd"]
+bzip = ["bzip2"]
 
 [lib]
 path = "src/lib.rs"
@@ -51,6 +52,7 @@ harness = false
 
 [dependencies]
 byteorder = "1.4.3"
+bzip2 = { version = "0.4.3", optional = true }
 crc = { version = "1.8.1", optional = true }
 digest = "0.9"
 libflate = "1.1.1"
diff --git a/lang/rust/README.md b/lang/rust/README.md
index 9d0ef24..93950ca 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -79,6 +79,14 @@ version = "x.y"
 features = ["zstandard"]
 ```
 
+Or in case you want to leverage the **Bzip2** codec:
+
+```toml
+[dependencies.avro-rs]
+version = "x.y"
+features = ["bzip"]
+```
+
 ## Upgrading to a newer minor version
 
 The library is still in beta, so there might be backward-incompatible changes between minor
@@ -254,6 +262,8 @@ compressed block is followed by the 4-byte, big-endianCRC32 checksum of the unco
 the block. You must enable the `snappy` feature to use this codec.
 * **Zstandard**: uses Facebook's [Zstandard](https://facebook.github.io/zstd/) compression library.
 You must enable the `zstandard` feature to use this codec.
+* **Bzip2**: uses [BZip2](https://sourceware.org/bzip2/) compression library.
+You must enable the `bzip` feature to use this codec.
 
 To specify a codec to use to compress data, just specify it while creating a `Writer`:
 ```rust
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index a27f058..84f0381 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -21,6 +21,12 @@ use libflate::deflate::{Decoder, Encoder};
 use std::io::{Read, Write};
 use strum_macros::{EnumString, IntoStaticStr};
 
+#[cfg(feature = "bzip")]
+use bzip2::{
+    read::{BzDecoder, BzEncoder},
+    Compression,
+};
+
 /// The compression codec used to compress blocks.
 #[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
 #[strum(serialize_all = "kebab_case")]
@@ -38,6 +44,10 @@ pub enum Codec {
     Snappy,
     #[cfg(feature = "zstandard")]
     Zstd,
+    #[cfg(feature = "bzip")]
+    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
+    /// compression library.
+    Bzip2,
 }
 
 impl From<Codec> for Value {
@@ -81,6 +91,13 @@ impl Codec {
                 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
                 *stream = encoder.finish().unwrap();
             }
+            #[cfg(feature = "bzip")]
+            Codec::Bzip2 => {
+                let mut encoder = BzEncoder::new(&stream[..], Compression::best());
+                let mut buffer = Vec::new();
+                encoder.read_to_end(&mut buffer).unwrap();
+                *stream = buffer;
+            }
         };
 
         Ok(())
@@ -124,6 +141,13 @@ impl Codec {
                 std::io::copy(&mut decoder, &mut decoded).map_err(Error::ZstdDecompress)?;
                 decoded
             }
+            #[cfg(feature = "bzip")]
+            Codec::Bzip2 => {
+                let mut decoder = BzDecoder::new(&stream[..]);
+                let mut decoded = Vec::new();
+                decoder.read_to_end(&mut decoded).unwrap();
+                decoded
+            }
         };
         Ok(())
     }
@@ -180,6 +204,18 @@ mod tests {
         assert_eq!(INPUT, stream.as_slice());
     }
 
+    #[cfg(feature = "bzip")]
+    #[test]
+    fn bzip_compress_and_decompress() {
+        let codec = Codec::Bzip2;
+        let mut stream = INPUT.to_vec();
+        codec.compress(&mut stream).unwrap();
+        assert_ne!(INPUT, stream.as_slice());
+        assert!(INPUT.len() > stream.len());
+        codec.decompress(&mut stream).unwrap();
+        assert_eq!(INPUT, stream.as_slice());
+    }
+
     #[test]
     fn codec_to_str() {
         assert_eq!(<&str>::from(Codec::Null), "null");
@@ -190,6 +226,9 @@ mod tests {
 
         #[cfg(feature = "zstandard")]
         assert_eq!(<&str>::from(Codec::Zstd), "zstd");
+
+        #[cfg(feature = "bzip")]
+        assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
     }
 
     #[test]
@@ -205,6 +244,9 @@ mod tests {
         #[cfg(feature = "zstandard")]
         assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
 
+        #[cfg(feature = "bzip")]
+        assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
+
         assert!(Codec::from_str("not a codec").is_err());
     }
 }
diff --git a/lang/rust/src/lib.rs b/lang/rust/src/lib.rs
index ca14939..18d9230 100644
--- a/lang/rust/src/lib.rs
+++ b/lang/rust/src/lib.rs
@@ -744,7 +744,7 @@ pub use codec::Codec;
 pub use de::from_value;
 pub use decimal::Decimal;
 pub use duration::{Days, Duration, Millis, Months};
-pub use error::{Error, Error as DeError, Error as SerError};
+pub use error::Error;
 pub use reader::{from_avro_datum, Reader};
 pub use schema::Schema;
 pub use ser::to_value;

[avro] 22/30: AVRO-3248: Rust: Support named types in UnionSchema (#1396)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 1a2d0a36b910fab16cc85399d38dd030dcd74f5c
Author: Litao Lu <lu...@gmail.com>
AuthorDate: Wed Jan 19 19:47:08 2022 +0800

    AVRO-3248: Rust: Support named types in UnionSchema (#1396)
    
    * AVRO-3248: Rust: Support named types in UnionSchema
    
    previously union does not support named types, and we will get error if
    we add 2 records into 1 UnionSchema.
    
    * AVRO-3248: Fix a typo in error message
    
    * AVRO-3248: Give better names for the schemata as string
    
    * AVRO-3248: More better names for variables
    
    * AVRO-3248: Code formatting
    
    * AVRO-3248 Fix formatting & build
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248: Fix generate_interop_data and formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248: Fix some regressions after the rebase to master
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3248 Fix the position in the Union for the Double value
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    Co-authored-by: Martin Grigorov <ma...@users.noreply.github.com>
    Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 49c6e1067c937d066503be9c7f4032fb03f67474)
---
 lang/rust/examples/generate_interop_data.rs |   2 +-
 lang/rust/src/de.rs                         | 100 +++++++++++++----------
 lang/rust/src/decode.rs                     |   5 +-
 lang/rust/src/encode.rs                     |  11 ++-
 lang/rust/src/reader.rs                     |   2 +-
 lang/rust/src/schema.rs                     | 121 ++++++++++++++++++++++++++--
 lang/rust/src/ser.rs                        |  30 ++++---
 lang/rust/src/types.rs                      |  84 +++++++++++--------
 lang/rust/src/writer.rs                     |   8 +-
 lang/rust/tests/io.rs                       |   5 +-
 10 files changed, 257 insertions(+), 111 deletions(-)

diff --git a/lang/rust/examples/generate_interop_data.rs b/lang/rust/examples/generate_interop_data.rs
index cb8efda..211c9cb 100644
--- a/lang/rust/examples/generate_interop_data.rs
+++ b/lang/rust/examples/generate_interop_data.rs
@@ -51,7 +51,7 @@ fn create_datum(schema: &Schema) -> Record {
         Value::Record(vec![("label".into(), Value::String("cee".into()))]),
     );
     datum.put("mapField", Value::Map(map));
-    datum.put("unionField", Value::Union(Box::new(Value::Double(12.0))));
+    datum.put("unionField", Value::Union(1, Box::new(Value::Double(12.0))));
     datum.put("enumField", Value::Enum(2, "C".to_owned()));
     datum.put("fixedField", Value::Fixed(16, b"1019181716151413".to_vec()));
     datum.put(
diff --git a/lang/rust/src/de.rs b/lang/rust/src/de.rs
index 6d89686..6324cae 100644
--- a/lang/rust/src/de.rs
+++ b/lang/rust/src/de.rs
@@ -162,7 +162,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
         self.input.first().map_or(
             Err(de::Error::custom("A record must have a least one field")),
             |item| match (item.0.as_ref(), &item.1) {
-                ("type", Value::String(x)) => Ok((
+                ("type", Value::String(x)) | ("type", Value::Enum(_, x)) => Ok((
                     seed.deserialize(StringDeserializer {
                         input: x.to_owned(),
                     })?,
@@ -173,7 +173,7 @@ impl<'de> de::EnumAccess<'de> for EnumDeserializer<'de> {
                     field
                 ))),
                 (_, _) => Err(de::Error::custom(
-                    "Expected first field of type String for the type name".to_string(),
+                    "Expected first field of type String or Enum for the type name".to_string(),
                 )),
             },
         )
@@ -250,7 +250,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
             | Value::TimestampMicros(i) => visitor.visit_i64(*i),
             &Value::Float(f) => visitor.visit_f32(f),
             &Value::Double(d) => visitor.visit_f64(d),
-            Value::Union(u) => match **u {
+            Value::Union(_i, u) => match **u {
                 Value::Null => visitor.visit_unit(),
                 Value::Boolean(b) => visitor.visit_bool(b),
                 Value::Int(i) => visitor.visit_i32(i),
@@ -316,7 +316,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
                     .map_err(|e| de::Error::custom(e.to_string()))
                     .and_then(|s| visitor.visit_string(s))
             }
-            Value::Union(ref x) => match **x {
+            Value::Union(_i, ref x) => match **x {
                 Value::String(ref s) => visitor.visit_string(s.to_owned()),
                 _ => Err(de::Error::custom("not a string|bytes|fixed")),
             },
@@ -354,8 +354,8 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
         V: Visitor<'de>,
     {
         match *self.input {
-            Value::Union(ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
-            Value::Union(ref inner) => visitor.visit_some(&Deserializer::new(inner)),
+            Value::Union(_i, ref inner) if inner.as_ref() == &Value::Null => visitor.visit_none(),
+            Value::Union(_i, ref inner) => visitor.visit_some(&Deserializer::new(inner)),
             _ => Err(de::Error::custom("not a union")),
         }
     }
@@ -398,7 +398,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
     {
         match *self.input {
             Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
-            Value::Union(ref inner) => match **inner {
+            Value::Union(_i, ref inner) => match **inner {
                 Value::Array(ref items) => visitor.visit_seq(SeqDeserializer::new(items)),
                 _ => Err(de::Error::custom("not an array")),
             },
@@ -446,7 +446,7 @@ impl<'a, 'de> de::Deserializer<'de> for &'a Deserializer<'de> {
     {
         match *self.input {
             Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
-            Value::Union(ref inner) => match **inner {
+            Value::Union(_i, ref inner) => match **inner {
                 Value::Record(ref fields) => visitor.visit_map(StructDeserializer::new(fields)),
                 _ => Err(de::Error::custom("not a record")),
             },
@@ -781,7 +781,7 @@ mod tests {
                 ("type".to_owned(), Value::String("Double".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Double(64.0))),
+                    Value::Union(1, Box::new(Value::Double(64.0))),
                 ),
             ]),
         )]);
@@ -804,10 +804,13 @@ mod tests {
                 ("type".to_owned(), Value::String("Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Record(vec![
-                        ("x".to_owned(), Value::Float(1.0)),
-                        ("y".to_owned(), Value::Float(2.0)),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Record(vec![
+                            ("x".to_owned(), Value::Float(1.0)),
+                            ("y".to_owned(), Value::Float(2.0)),
+                        ])),
+                    ),
                 ),
             ]),
         )]);
@@ -830,10 +833,10 @@ mod tests {
                 ("type".to_owned(), Value::String("Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Array(vec![
-                        Value::Float(1.0),
-                        Value::Float(2.0),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Array(vec![Value::Float(1.0), Value::Float(2.0)])),
+                    ),
                 ),
             ]),
         )]);
@@ -965,75 +968,84 @@ mod tests {
             ),
             (
                 "a_union_string".to_string(),
-                Value::Union(Box::new(Value::String("a union string".to_string()))),
+                Value::Union(0, Box::new(Value::String("a union string".to_string()))),
             ),
             (
                 "a_union_long".to_string(),
-                Value::Union(Box::new(Value::Long(412))),
+                Value::Union(0, Box::new(Value::Long(412))),
             ),
             (
                 "a_union_long".to_string(),
-                Value::Union(Box::new(Value::Long(412))),
+                Value::Union(0, Box::new(Value::Long(412))),
             ),
             (
                 "a_time_micros".to_string(),
-                Value::Union(Box::new(Value::TimeMicros(123))),
+                Value::Union(0, Box::new(Value::TimeMicros(123))),
             ),
             (
                 "a_non_existing_time_micros".to_string(),
-                Value::Union(Box::new(Value::TimeMicros(-123))),
+                Value::Union(0, Box::new(Value::TimeMicros(-123))),
             ),
             (
                 "a_timestamp_millis".to_string(),
-                Value::Union(Box::new(Value::TimestampMillis(234))),
+                Value::Union(0, Box::new(Value::TimestampMillis(234))),
             ),
             (
                 "a_non_existing_timestamp_millis".to_string(),
-                Value::Union(Box::new(Value::TimestampMillis(-234))),
+                Value::Union(0, Box::new(Value::TimestampMillis(-234))),
             ),
             (
                 "a_timestamp_micros".to_string(),
-                Value::Union(Box::new(Value::TimestampMicros(345))),
+                Value::Union(0, Box::new(Value::TimestampMicros(345))),
             ),
             (
                 "a_non_existing_timestamp_micros".to_string(),
-                Value::Union(Box::new(Value::TimestampMicros(-345))),
+                Value::Union(0, Box::new(Value::TimestampMicros(-345))),
             ),
             (
                 "a_record".to_string(),
-                Value::Union(Box::new(Value::Record(vec![(
-                    "record_in_union".to_string(),
-                    Value::Int(-2),
-                )]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Record(vec![(
+                        "record_in_union".to_string(),
+                        Value::Int(-2),
+                    )])),
+                ),
             ),
             (
                 "a_non_existing_record".to_string(),
-                Value::Union(Box::new(Value::Record(vec![(
-                    "blah".to_string(),
-                    Value::Int(-22),
-                )]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Record(vec![("blah".to_string(), Value::Int(-22))])),
+                ),
             ),
             (
                 "an_array".to_string(),
-                Value::Union(Box::new(Value::Array(vec![
-                    Value::Boolean(true),
-                    Value::Boolean(false),
-                ]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Array(vec![
+                        Value::Boolean(true),
+                        Value::Boolean(false),
+                    ])),
+                ),
             ),
             (
                 "a_non_existing_array".to_string(),
-                Value::Union(Box::new(Value::Array(vec![
-                    Value::Boolean(false),
-                    Value::Boolean(true),
-                ]))),
+                Value::Union(
+                    0,
+                    Box::new(Value::Array(vec![
+                        Value::Boolean(false),
+                        Value::Boolean(true),
+                    ])),
+                ),
             ),
             (
                 "a_union_map".to_string(),
-                Value::Union(Box::new(Value::Map(value_map))),
+                Value::Union(0, Box::new(Value::Map(value_map))),
             ),
             (
                 "a_non_existing_union_map".to_string(),
-                Value::Union(Box::new(Value::Map(HashMap::new()))),
+                Value::Union(0, Box::new(Value::Map(HashMap::new()))),
             ),
         ]);
 
diff --git a/lang/rust/src/decode.rs b/lang/rust/src/decode.rs
index 26678bf..eb9c018 100644
--- a/lang/rust/src/decode.rs
+++ b/lang/rust/src/decode.rs
@@ -215,18 +215,17 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
                             num_variants: variants.len(),
                         })?;
                     let value = decode0(variant, reader, schemas_by_name)?;
-                    Ok(Value::Union(Box::new(value)))
+                    Ok(Value::Union(index as i32, Box::new(value)))
                 }
                 Err(Error::ReadVariableIntegerBytes(io_err)) => {
                     if let ErrorKind::UnexpectedEof = io_err.kind() {
-                        Ok(Value::Union(Box::new(Value::Null)))
+                        Ok(Value::Union(0, Box::new(Value::Null)))
                     } else {
                         Err(Error::ReadVariableIntegerBytes(io_err))
                     }
                 }
                 Err(io_err) => Err(io_err),
             },
-
             Schema::Record {
                 ref name,
                 ref fields,
diff --git a/lang/rust/src/encode.rs b/lang/rust/src/encode.rs
index 088def9..9a1fbef 100644
--- a/lang/rust/src/encode.rs
+++ b/lang/rust/src/encode.rs
@@ -124,14 +124,13 @@ pub fn encode_ref(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) {
             },
             Value::Fixed(_, bytes) => buffer.extend(bytes),
             Value::Enum(i, _) => encode_int(*i, buffer),
-            Value::Union(item) => {
+            Value::Union(idx, item) => {
                 if let Schema::Union(ref inner) = *schema {
-                    // Find the schema that is matched here. Due to validation, this should always
-                    // return a value.
-                    let (idx, inner_schema) = inner
-                        .find_schema(item)
+                    let inner_schema = inner
+                        .schemas
+                        .get(*idx as usize)
                         .expect("Invalid Union validation occurred");
-                    encode_long(idx as i64, buffer);
+                    encode_long(*idx as i64, buffer);
                     encode_ref0(&*item, inner_schema, buffer, schemas_by_name);
                 } else {
                     error!("invalid schema type for Union: {:?}", schema);
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index 4f6d311..9634a27 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -432,7 +432,7 @@ mod tests {
 
         assert_eq!(
             from_avro_datum(&schema, &mut encoded, None).unwrap(),
-            Value::Union(Box::new(Value::Long(0)))
+            Value::Union(1, Box::new(Value::Long(0)))
         );
     }
 
diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 2697328..4f85ee9 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -173,6 +173,13 @@ impl SchemaKind {
                 | SchemaKind::String,
         )
     }
+
+    pub fn is_named(self) -> bool {
+        matches!(
+            self,
+            SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed
+        )
+    }
 }
 
 impl<'a> From<&'a types::Value> for SchemaKind {
@@ -189,7 +196,7 @@ impl<'a> From<&'a types::Value> for SchemaKind {
             Value::String(_) => Self::String,
             Value::Array(_) => Self::Array,
             Value::Map(_) => Self::Map,
-            Value::Union(_) => Self::Union,
+            Value::Union(_, _) => Self::Union,
             Value::Record(_) => Self::Record,
             Value::Enum(_, _) => Self::Enum,
             Value::Fixed(_, _) => Self::Fixed,
@@ -362,7 +369,7 @@ impl UnionSchema {
                 return Err(Error::GetNestedUnion);
             }
             let kind = SchemaKind::from(schema);
-            if vindex.insert(kind, i).is_some() {
+            if !kind.is_named() && vindex.insert(kind, i).is_some() {
                 return Err(Error::GetUnionDuplicate);
             }
         }
@@ -385,12 +392,12 @@ impl UnionSchema {
     /// Optionally returns a reference to the schema matched by this value, as well as its position
     /// within this union.
     pub fn find_schema(&self, value: &types::Value) -> Option<(usize, &Schema)> {
-        let type_index = &SchemaKind::from(value);
-        if let Some(&i) = self.variant_index.get(type_index) {
+        let schema_kind = SchemaKind::from(value);
+        if let Some(&i) = self.variant_index.get(&schema_kind) {
             // fast path
             Some((i, &self.schemas[i]))
         } else {
-            // slow path (required for matching logical types)
+            // slow path (required for matching logical or named types)
             self.schemas
                 .iter()
                 .enumerate()
@@ -1315,6 +1322,110 @@ mod tests {
         assert_eq!(variants.next(), None);
     }
 
+    // AVRO-3248
+    #[test]
+    fn test_union_of_records() {
+        use std::iter::FromIterator;
+
+        // A and B are the same except the name.
+        let schema_str_a = r#"{
+            "name": "A",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        let schema_str_b = r#"{
+            "name": "B",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        // we get Error::GetNameField if we put ["A", "B"] directly here.
+        let schema_str_c = r#"{
+            "name": "C",
+            "type": "record",
+            "fields": [
+                {"name": "field_one",  "type": ["A", "B"]}
+            ]
+        }"#;
+
+        let schema_a = Schema::parse_str(schema_str_a).unwrap();
+        let schema_b = Schema::parse_str(schema_str_b).unwrap();
+
+        let schema_c = Schema::parse_list(&[schema_str_a, schema_str_b, schema_str_c])
+            .unwrap()
+            .last()
+            .unwrap()
+            .clone();
+
+        let schema_c_expected = Schema::Record {
+            name: Name::new("C"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "field_one".to_string(),
+                doc: None,
+                default: None,
+                schema: Schema::Union(UnionSchema::new(vec![schema_a, schema_b]).unwrap()),
+                order: RecordFieldOrder::Ignore,
+                position: 0,
+            }],
+            lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+        };
+
+        assert_eq!(schema_c, schema_c_expected);
+    }
+
+    // AVRO-3248
+    #[test]
+    fn test_nullable_record() {
+        use std::iter::FromIterator;
+
+        let schema_str_a = r#"{
+            "name": "A",
+            "type": "record",
+            "fields": [
+                {"name": "field_one", "type": "float"}
+            ]
+        }"#;
+
+        // we get Error::GetNameField if we put ["null", "B"] directly here.
+        let schema_str_option_a = r#"{
+            "name": "OptionA",
+            "type": "record",
+            "fields": [
+                {"name": "field_one",  "type": ["null", "A"], "default": "null"}
+            ]
+        }"#;
+
+        let schema_a = Schema::parse_str(schema_str_a).unwrap();
+
+        let schema_option_a = Schema::parse_list(&[schema_str_a, schema_str_option_a])
+            .unwrap()
+            .last()
+            .unwrap()
+            .clone();
+
+        let schema_option_a_expected = Schema::Record {
+            name: Name::new("OptionA"),
+            doc: None,
+            fields: vec![RecordField {
+                name: "field_one".to_string(),
+                doc: None,
+                default: Some(Value::Null),
+                schema: Schema::Union(UnionSchema::new(vec![Schema::Null, schema_a]).unwrap()),
+                order: RecordFieldOrder::Ignore,
+                position: 0,
+            }],
+            lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]),
+        };
+
+        assert_eq!(schema_option_a, schema_option_a_expected);
+    }
+
     #[test]
     fn test_record_schema() {
         let parsed = Schema::parse_str(
diff --git a/lang/rust/src/ser.rs b/lang/rust/src/ser.rs
index 480ea0e..444ee20 100644
--- a/lang/rust/src/ser.rs
+++ b/lang/rust/src/ser.rs
@@ -234,7 +234,7 @@ impl<'b> ser::Serializer for &'b mut Serializer {
             ),
             (
                 "value".to_owned(),
-                Value::Union(Box::new(value.serialize(self)?)),
+                Value::Union(index as i32, Box::new(value.serialize(self)?)),
             ),
         ]))
     }
@@ -346,9 +346,10 @@ impl<'a> ser::SerializeSeq for SeqVariantSerializer<'a> {
     where
         T: Serialize,
     {
-        self.items.push(Value::Union(Box::new(
-            value.serialize(&mut Serializer::default())?,
-        )));
+        self.items.push(Value::Union(
+            self.index as i32,
+            Box::new(value.serialize(&mut Serializer::default())?),
+        ));
         Ok(())
     }
 
@@ -469,7 +470,7 @@ impl<'a> ser::SerializeStructVariant for StructVariantSerializer<'a> {
             ),
             (
                 "value".to_owned(),
-                Value::Union(Box::new(Value::Record(self.fields))),
+                Value::Union(self.index as i32, Box::new(Value::Record(self.fields))),
             ),
         ]))
     }
@@ -776,7 +777,7 @@ mod tests {
                 ("type".to_owned(), Value::Enum(0, "Double".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Double(64.0))),
+                    Value::Union(0, Box::new(Value::Double(64.0))),
                 ),
             ]),
         )]);
@@ -836,10 +837,13 @@ mod tests {
                 ("type".to_owned(), Value::Enum(0, "Val1".to_owned())),
                 (
                     "value".to_owned(),
-                    Value::Union(Box::new(Value::Record(vec![
-                        ("x".to_owned(), Value::Float(1.0)),
-                        ("y".to_owned(), Value::Float(2.0)),
-                    ]))),
+                    Value::Union(
+                        0,
+                        Box::new(Value::Record(vec![
+                            ("x".to_owned(), Value::Float(1.0)),
+                            ("y".to_owned(), Value::Float(2.0)),
+                        ])),
+                    ),
                 ),
             ]),
         )]);
@@ -946,9 +950,9 @@ mod tests {
                 (
                     "value".to_owned(),
                     Value::Array(vec![
-                        Value::Union(Box::new(Value::Float(1.0))),
-                        Value::Union(Box::new(Value::Float(2.0))),
-                        Value::Union(Box::new(Value::Float(3.0))),
+                        Value::Union(1, Box::new(Value::Float(1.0))),
+                        Value::Union(1, Box::new(Value::Float(2.0))),
+                        Value::Union(1, Box::new(Value::Float(3.0))),
                     ]),
                 ),
             ]),
diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 48a1813..77472b0 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -66,7 +66,12 @@ pub enum Value {
     /// reading values.
     Enum(i32, String),
     /// An `union` Avro value.
-    Union(Box<Value>),
+    ///
+    /// A Union is represented by the value it holds and its position in the type list
+    /// of its corresponding schema
+    /// This allows schema-less encoding, as well as schema resolution while
+    /// reading values.
+    Union(i32, Box<Value>),
     /// An `array` Avro value.
     Array(Vec<Value>),
     /// A `map` Avro value.
@@ -168,7 +173,11 @@ where
     T: Into<Self>,
 {
     fn from(value: Option<T>) -> Self {
-        Self::Union(Box::new(value.map_or_else(|| Self::Null, Into::into)))
+        // FIXME: this is incorrect in case first type in union is not "none"
+        Self::Union(
+            value.is_some() as i32,
+            Box::new(value.map_or_else(|| Self::Null, Into::into)),
+        )
     }
 }
 
@@ -285,7 +294,7 @@ impl std::convert::TryFrom<Value> for JsonValue {
                 Ok(Self::Array(items.into_iter().map(|v| v.into()).collect()))
             }
             Value::Enum(_i, s) => Ok(Self::String(s)),
-            Value::Union(b) => Self::try_from(*b),
+            Value::Union(_i, b) => Self::try_from(*b),
             Value::Array(items) => items
                 .into_iter()
                 .map(Self::try_from)
@@ -358,9 +367,11 @@ impl Value {
                 .map(|ref symbol| symbol == &s)
                 .unwrap_or(false),
             // (&Value::Union(None), &Schema::Union(_)) => true,
-            (&Value::Union(ref value), &Schema::Union(ref inner)) => {
-                inner.find_schema(value).is_some()
-            }
+            (&Value::Union(i, ref value), &Schema::Union(ref inner)) => inner
+                .variants()
+                .get(i as usize)
+                .map(|schema| value.validate(schema))
+                .unwrap_or(false),
             (&Value::Array(ref items), &Schema::Array(ref inner)) => {
                 items.iter().all(|item| item.validate(inner))
             }
@@ -409,7 +420,7 @@ impl Value {
             {
                 // Pull out the Union, and attempt to resolve against it.
                 let v = match value {
-                    Value::Union(b) => &**b,
+                    Value::Union(_i, b) => &**b,
                     _ => unreachable!(),
                 };
                 *value = v.clone();
@@ -703,13 +714,14 @@ impl Value {
     fn resolve_union(self, schema: &UnionSchema) -> Result<Self, Error> {
         let v = match self {
             // Both are unions case.
-            Value::Union(v) => *v,
+            Value::Union(_i, v) => *v,
             // Reader is a union, but writer is not.
             v => v,
         };
         // Find the first match in the reader schema.
-        let (_, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
-        Ok(Value::Union(Box::new(v.resolve(inner)?)))
+        // FIXME: this might be wrong when the union consists of multiple same records that have different names
+        let (i, inner) = schema.find_schema(&v).ok_or(Error::FindUnionVariant)?;
+        Ok(Value::Union(i as i32, Box::new(v.resolve(inner)?)))
     }
 
     fn resolve_array(self, schema: &Schema) -> Result<Self, Error> {
@@ -770,10 +782,11 @@ impl Value {
                                 // NOTE: this match exists only to optimize null defaults for large
                                 // backward-compatible schemas with many nullable fields
                                 match first {
-                                    Schema::Null => Value::Union(Box::new(Value::Null)),
-                                    _ => Value::Union(Box::new(
-                                        Value::from(value.clone()).resolve(first)?,
-                                    )),
+                                    Schema::Null => Value::Union(0, Box::new(Value::Null)),
+                                    _ => Value::Union(
+                                        0,
+                                        Box::new(Value::from(value.clone()).resolve(first)?),
+                                    ),
                                 }
                             }
                             _ => Value::from(value.clone()),
@@ -821,22 +834,22 @@ mod tests {
             (Value::Int(42), Schema::Int, true),
             (Value::Int(42), Schema::Boolean, false),
             (
-                Value::Union(Box::new(Value::Null)),
+                Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Int(42))),
+                Value::Union(1, Box::new(Value::Int(42))),
                 Schema::Union(UnionSchema::new(vec![Schema::Null, Schema::Int]).unwrap()),
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Null)),
+                Value::Union(0, Box::new(Value::Null)),
                 Schema::Union(UnionSchema::new(vec![Schema::Double, Schema::Int]).unwrap()),
                 false,
             ),
             (
-                Value::Union(Box::new(Value::Int(42))),
+                Value::Union(3, Box::new(Value::Int(42))),
                 Schema::Union(
                     UnionSchema::new(vec![
                         Schema::Null,
@@ -849,7 +862,7 @@ mod tests {
                 true,
             ),
             (
-                Value::Union(Box::new(Value::Long(42i64))),
+                Value::Union(1, Box::new(Value::Long(42i64))),
                 Schema::Union(
                     UnionSchema::new(vec![Schema::Null, Schema::TimestampMillis]).unwrap(),
                 ),
@@ -997,20 +1010,26 @@ mod tests {
 
         let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap());
 
-        assert!(Value::Union(Box::new(Value::Record(vec![
-            ("a".to_string(), Value::Long(42i64)),
-            ("b".to_string(), Value::String("foo".to_string())),
-        ])))
-        .validate(&union_schema));
-
-        assert!(Value::Union(Box::new(Value::Map(
-            vec![
+        assert!(Value::Union(
+            1,
+            Box::new(Value::Record(vec![
                 ("a".to_string(), Value::Long(42i64)),
                 ("b".to_string(), Value::String("foo".to_string())),
-            ]
-            .into_iter()
-            .collect()
-        )))
+            ]))
+        )
+        .validate(&union_schema));
+
+        assert!(Value::Union(
+            1,
+            Box::new(Value::Map(
+                vec![
+                    ("a".to_string(), Value::Long(42i64)),
+                    ("b".to_string(), Value::String("foo".to_string())),
+                ]
+                .into_iter()
+                .collect()
+            ))
+        )
         .validate(&union_schema));
     }
 
@@ -1193,7 +1212,8 @@ mod tests {
             JsonValue::String("test_enum".into())
         );
         assert_eq!(
-            JsonValue::try_from(Value::Union(Box::new(Value::String("test_enum".into())))).unwrap(),
+            JsonValue::try_from(Value::Union(1, Box::new(Value::String("test_enum".into()))))
+                .unwrap(),
             JsonValue::String("test_enum".into())
         );
         assert_eq!(
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index 41e77c9..a222a0f 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -398,7 +398,7 @@ mod tests {
     #[test]
     fn test_union_not_null() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
-        let union = Value::Union(Box::new(Value::Long(3)));
+        let union = Value::Union(1, Box::new(Value::Long(3)));
 
         let mut expected = Vec::new();
         zig_i64(1, &mut expected);
@@ -410,7 +410,7 @@ mod tests {
     #[test]
     fn test_union_null() {
         let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
-        let union = Value::Union(Box::new(Value::Null));
+        let union = Value::Union(0, Box::new(Value::Null));
 
         let mut expected = Vec::new();
         zig_i64(0, &mut expected);
@@ -781,11 +781,11 @@ mod tests {
         let mut record1 = Record::new(&schema).unwrap();
         record1.put(
             "a",
-            Value::Union(Box::new(Value::TimestampMicros(1234_i64))),
+            Value::Union(1, Box::new(Value::TimestampMicros(1234_i64))),
         );
 
         let mut record2 = Record::new(&schema).unwrap();
-        record2.put("a", Value::Union(Box::new(Value::Null)));
+        record2.put("a", Value::Union(0, Box::new(Value::Null)));
 
         let n1 = writer.append(record1).unwrap();
         let n2 = writer.append(record2).unwrap();
diff --git a/lang/rust/tests/io.rs b/lang/rust/tests/io.rs
index c1ab1d7..2d4c7ee 100644
--- a/lang/rust/tests/io.rs
+++ b/lang/rust/tests/io.rs
@@ -34,7 +34,7 @@ lazy_static! {
         (r#"{"type": "enum", "name": "Test", "symbols": ["A", "B"]}"#, Value::Enum(1, "B".to_string())),
         (r#"{"type": "array", "items": "long"}"#, Value::Array(vec![Value::Long(1), Value::Long(3), Value::Long(2)])),
         (r#"{"type": "map", "values": "long"}"#, Value::Map([("a".to_string(), Value::Long(1i64)), ("b".to_string(), Value::Long(3i64)), ("c".to_string(), Value::Long(2i64))].iter().cloned().collect())),
-        (r#"["string", "null", "long"]"#, Value::Union(Box::new(Value::Null))),
+        (r#"["string", "null", "long"]"#, Value::Union(1, Box::new(Value::Null))),
         (r#"{"type": "record", "name": "Test", "fields": [{"name": "f", "type": "long"}]}"#, Value::Record(vec![("f".to_string(), Value::Long(1))]))
     ];
 
@@ -65,8 +65,9 @@ lazy_static! {
         (r#"{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}"#, r#""FOO""#, Value::Enum(0, "FOO".to_string())),
         (r#"{"type": "array", "items": "int"}"#, "[1, 2, 3]", Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)])),
         (r#"{"type": "map", "values": "int"}"#, r#"{"a": 1, "b": 2}"#, Value::Map([("a".to_string(), Value::Int(1)), ("b".to_string(), Value::Int(2))].iter().cloned().collect())),
-        (r#"["int", "null"]"#, "5", Value::Union(Box::new(Value::Int(5)))),
+        (r#"["int", "null"]"#, "5", Value::Union(0, Box::new(Value::Int(5)))),
         (r#"{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}"#, r#"{"A": 5}"#,Value::Record(vec![("A".to_string(), Value::Int(5))])),
+        (r#"["null", "int"]"#, "null", Value::Union(0, Box::new(Value::Null))),
     ];
 
     static ref LONG_RECORD_SCHEMA: Schema = Schema::parse_str(r#"

[avro] 24/30: AVRO-3302: No need of special handling for Schema::Ref while serializing to JSON

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 9164b7d497e9c473d56e2d2c50f1797f3b39caba
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Jan 20 08:51:27 2022 +0200

    AVRO-3302: No need of special handling for Schema::Ref while serializing to JSON
    
    Just serialize the 'name'.
    The schema validity is not a concern at serialization time.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit bb6b6dae7692f7ce6f0fbe9561c7408e8f295bec)
---
 lang/rust/src/schema.rs | 40 +++++-----------------------------------
 1 file changed, 5 insertions(+), 35 deletions(-)

diff --git a/lang/rust/src/schema.rs b/lang/rust/src/schema.rs
index 4f85ee9..73b64c7 100644
--- a/lang/rust/src/schema.rs
+++ b/lang/rust/src/schema.rs
@@ -21,7 +21,6 @@ use digest::Digest;
 use lazy_static::lazy_static;
 use regex::Regex;
 use serde::{
-    ser,
     ser::{SerializeMap, SerializeSeq},
     Deserialize, Serialize, Serializer,
 };
@@ -32,7 +31,6 @@ use std::{
     convert::TryInto,
     fmt,
     str::FromStr,
-    sync::{Arc, Mutex},
 };
 use strum_macros::{EnumDiscriminants, EnumString};
 
@@ -449,13 +447,6 @@ struct Parser {
 }
 
 impl Schema {
-    // Used to help resolve cyclic references while serializing Schema to JSON.
-    // Needed because serde[_json] does not support using contexts.
-    // TODO: See whether alternatives like
-    // https://users.rust-lang.org/t/serde-question-access-to-a-shared-context-data-within-serialize-and-deserialize/39546
-    // can be used
-    thread_local!(static SCHEMAS_BY_NAME: Arc<Mutex<HashMap<String, Schema>>> = Arc::new(Mutex::new(HashMap::new())));
-
     /// Converts `self` into its [Parsing Canonical Form].
     ///
     /// [Parsing Canonical Form]:
@@ -956,30 +947,8 @@ impl Serialize for Schema {
     where
         S: Serializer,
     {
-        fn remember_schema(name: &Name, schema: &Schema) {
-            Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| match schemas_by_name.try_lock() {
-                Ok(mut schemas) => {
-                    schemas.insert((&name.name).clone(), schema.clone());
-                }
-                Err(poisoned) => {
-                    error!("Wasn't able to lock schemas_by_name {:?}", poisoned);
-                }
-            });
-        }
-
         match *self {
-            Schema::Ref { ref name } => {
-                let name = &name.name;
-                Schema::SCHEMAS_BY_NAME.with(|schemas_by_name| {
-                    let schemas = schemas_by_name.lock().unwrap();
-                    if schemas.contains_key(name.as_str()) {
-                        serializer.serialize_str(name)
-                    } else {
-                        Err(ser::Error::custom(format!("Could not serialize Schema::Ref('{}') because it cannot be found in ({})",
-                                                       name, schemas.keys().cloned().collect::<Vec<_>>().join(", "))))
-                    }
-                })
-            }
+            Schema::Ref { ref name } => serializer.serialize_str(&name.name),
             Schema::Null => serializer.serialize_str("null"),
             Schema::Boolean => serializer.serialize_str("boolean"),
             Schema::Int => serializer.serialize_str("int"),
@@ -1014,7 +983,6 @@ impl Serialize for Schema {
                 ref fields,
                 ..
             } => {
-                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "record")?;
                 if let Some(ref n) = name.namespace {
@@ -1035,7 +1003,6 @@ impl Serialize for Schema {
                 ref symbols,
                 ..
             } => {
-                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "enum")?;
                 map.serialize_entry("name", &name.name)?;
@@ -1047,7 +1014,6 @@ impl Serialize for Schema {
                 ref doc,
                 ref size,
             } => {
-                remember_schema(name, self);
                 let mut map = serializer.serialize_map(None)?;
                 map.serialize_entry("type", "fixed")?;
                 map.serialize_entry("name", &name.name)?;
@@ -1542,6 +1508,10 @@ mod tests {
             lookup,
         };
         assert_eq!(schema, expected);
+
+        let canonical_form = &schema.canonical_form();
+        let expected = r#"{"name":"test","type":"record","fields":[{"name":"recordField","type":{"name":"Node","type":"record","fields":[{"name":"label","type":"string"},{"name":"children","type":{"type":"array","items":"Node"}}]}}]}"#;
+        assert_eq!(canonical_form, &expected);
     }
 
     #[test]

[avro] 29/30: Update zstd requirement in /lang/rust (#1471)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 029cd6169c3f0201402f4db7f9f69232ec4f6afa
Author: dependabot[bot] <49...@users.noreply.github.com>
AuthorDate: Mon Jan 24 10:40:00 2022 +0200

    Update zstd requirement in /lang/rust (#1471)
    
    Updates the requirements on [zstd](https://github.com/gyscos/zstd-rs) to permit the latest version.
    - [Release notes](https://github.com/gyscos/zstd-rs/releases)
    - [Commits](https://github.com/gyscos/zstd-rs/commits)
    
    ---
    updated-dependencies:
    - dependency-name: zstd
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <su...@github.com>
    
    Co-authored-by: dependabot[bot] <49...@users.noreply.github.com>
    (cherry picked from commit 0cbcda5ab0b51fd1c9c47d9adf23388e865dbd44)
---
 lang/rust/Cargo.toml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 0f1f4ed..0f230a4 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -72,7 +72,7 @@ uuid = { version = "0.8.2", features = ["serde", "v4"] }
 zerocopy = "0.6.1"
 lazy_static = "1.1.1"
 log = "0.4.14"
-zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }
+zstd = { version = "0.10.0+zstd.1.5.0" , optional = true }
 
 [dev-dependencies]
 md-5 = "0.10.0"

[avro] 02/30: duplicate line remove (#1440)

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 2d337f93436b42ca50ae981a0741b7f5ec9055c2
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Wed Jan 5 00:39:02 2022 -0600

    duplicate line remove (#1440)
    
    (cherry picked from commit 336f04adbe596b8f303b55ac29fbfc5c4785d960)
---
 lang/rust/src/types.rs | 1 -
 1 file changed, 1 deletion(-)

diff --git a/lang/rust/src/types.rs b/lang/rust/src/types.rs
index 22c6ea8..63876da 100644
--- a/lang/rust/src/types.rs
+++ b/lang/rust/src/types.rs
@@ -96,7 +96,6 @@ pub enum Value {
     /// Avro Duration. An amount of time defined by months, days and milliseconds.
     Duration(Duration),
     /// Universally unique identifier.
-    /// Universally unique identifier.
     Uuid(Uuid),
 }
 /// Any structure implementing the [ToAvro](trait.ToAvro.html) trait will be usable