You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2023/03/13 07:10:00 UTC

[avro] branch master updated: AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new b8b83b72f AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)
b8b83b72f is described below

commit b8b83b72f7184cf1b388fc20331d11eabbd93e06
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Mon Mar 13 09:09:53 2023 +0200

    AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)
    
    * AVRO-3683: Add support for using multiple schemata for resolve/validate/write
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP compiles
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP compiles and all tests pass
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP Add support for reading
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP
    
    Use a main schema and pass all other schemata for resolution.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: Formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: WIP
    
    Add support for multiple schemata in Reader/Writer APIs
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3646: Formatting
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: Use Vec instead of slice reference for schemata
    
    It is much easier to deal with.
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: Fix the resolving of the writer schema when reading an Avro file
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3683: Cleaup
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    ---------
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/lib.rs                          |   8 +-
 lang/rust/avro/src/reader.rs                       |  79 ++++++++++++--
 lang/rust/avro/src/schema.rs                       | 118 +++++++++++++--------
 lang/rust/avro/src/types.rs                        |  31 +++---
 lang/rust/avro/src/writer.rs                       |  96 ++++++++++++++---
 .../rust/avro/tests/to_from_avro_datum_schemata.rs |  84 +++++++++++++++
 6 files changed, 330 insertions(+), 86 deletions(-)

diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index a6b06d74a..2c6f46c07 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -743,12 +743,16 @@ pub use decimal::Decimal;
 pub use duration::{Days, Duration, Millis, Months};
 pub use error::Error;
 pub use reader::{
-    from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
+    from_avro_datum, from_avro_datum_schemata, read_marker, GenericSingleObjectReader, Reader,
+    SpecificSingleObjectReader,
 };
 pub use schema::{AvroSchema, Schema};
 pub use ser::to_value;
 pub use util::max_allocation_bytes;
-pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
+pub use writer::{
+    to_avro_datum, to_avro_datum_schemata, GenericSingleObjectWriter, SpecificSingleObjectWriter,
+    Writer,
+};
 
 #[cfg(feature = "derive")]
 pub use apache_avro_derive::*;
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index 442f13075..3489ccfb1 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -20,7 +20,7 @@ use crate::{
     decode::{decode, decode_internal},
     from_value,
     rabin::Rabin,
-    schema::{AvroSchema, ResolvedOwnedSchema, Schema},
+    schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema},
     types::Value,
     util, AvroResult, Codec, Error,
 };
@@ -36,7 +36,7 @@ use std::{
 
 // Internal Block reader.
 #[derive(Debug, Clone)]
-struct Block<R> {
+struct Block<'r, R> {
     reader: R,
     // Internal buffering to reduce allocation.
     buf: Vec<u8>,
@@ -46,15 +46,17 @@ struct Block<R> {
     marker: [u8; 16],
     codec: Codec,
     writer_schema: Schema,
+    schemata: Vec<&'r Schema>,
     user_metadata: HashMap<String, Vec<u8>>,
 }
 
-impl<R: Read> Block<R> {
-    fn new(reader: R) -> AvroResult<Block<R>> {
+impl<'r, R: Read> Block<'r, R> {
+    fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<R>> {
         let mut block = Block {
             reader,
             codec: Codec::Null,
             writer_schema: Schema::Null,
+            schemata,
             buf: vec![],
             buf_idx: 0,
             message_count: 0,
@@ -178,7 +180,13 @@ impl<R: Read> Block<R> {
 
         let mut block_bytes = &self.buf[self.buf_idx..];
         let b_original = block_bytes.len();
-        let item = from_avro_datum(&self.writer_schema, &mut block_bytes, read_schema)?;
+        let schemata = if self.schemata.is_empty() {
+            vec![&self.writer_schema]
+        } else {
+            self.schemata.clone()
+        };
+        let item =
+            from_avro_datum_schemata(&self.writer_schema, schemata, &mut block_bytes, read_schema)?;
         if b_original == block_bytes.len() {
             // from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
             return Err(Error::ReadBlock);
@@ -189,7 +197,7 @@ impl<R: Read> Block<R> {
     }
 
     fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
-        let json = metadata
+        let json: serde_json::Value = metadata
             .get("avro.schema")
             .and_then(|bytes| {
                 if let Value::Bytes(ref bytes) = *bytes {
@@ -199,7 +207,17 @@ impl<R: Read> Block<R> {
                 }
             })
             .ok_or(Error::GetAvroSchemaFromMap)?;
-        self.writer_schema = Schema::parse(&json)?;
+        if !self.schemata.is_empty() {
+            let rs = ResolvedSchema::try_from(self.schemata.clone())?;
+            let names: Names = rs
+                .get_names()
+                .iter()
+                .map(|(name, schema)| (name.clone(), (*schema).clone()))
+                .collect();
+            self.writer_schema = Schema::parse_with_names(&json, names)?;
+        } else {
+            self.writer_schema = Schema::parse(&json)?;
+        }
         Ok(())
     }
 
@@ -261,7 +279,7 @@ fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
 /// }
 /// ```
 pub struct Reader<'a, R> {
-    block: Block<R>,
+    block: Block<'a, R>,
     reader_schema: Option<&'a Schema>,
     errored: bool,
     should_resolve_schema: bool,
@@ -273,7 +291,7 @@ impl<'a, R: Read> Reader<'a, R> {
     ///
     /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
     pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
-        let block = Block::new(reader)?;
+        let block = Block::new(reader, vec![])?;
         let reader = Reader {
             block,
             reader_schema: None,
@@ -288,7 +306,28 @@ impl<'a, R: Read> Reader<'a, R> {
     ///
     /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
     pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
-        let block = Block::new(reader)?;
+        let block = Block::new(reader, vec![schema])?;
+        let mut reader = Reader {
+            block,
+            reader_schema: Some(schema),
+            errored: false,
+            should_resolve_schema: false,
+        };
+        // Check if the reader and writer schemas disagree.
+        reader.should_resolve_schema = reader.writer_schema() != schema;
+        Ok(reader)
+    }
+
+    /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
+    /// to read from.
+    ///
+    /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
+    pub fn with_schemata(
+        schema: &'a Schema,
+        schemata: Vec<&'a Schema>,
+        reader: R,
+    ) -> AvroResult<Reader<'a, R>> {
+        let block = Block::new(reader, schemata)?;
         let mut reader = Reader {
             block,
             reader_schema: Some(schema),
@@ -368,6 +407,26 @@ pub fn from_avro_datum<R: Read>(
     }
 }
 
+/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
+/// to read from.
+/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
+/// schemata to resolve any dependencies.
+///
+/// In case a reader `Schema` is provided, schema resolution will also be performed.
+pub fn from_avro_datum_schemata<R: Read>(
+    writer_schema: &Schema,
+    schemata: Vec<&Schema>,
+    reader: &mut R,
+    reader_schema: Option<&Schema>,
+) -> AvroResult<Value> {
+    let rs = ResolvedSchema::try_from(schemata)?;
+    let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
+    match reader_schema {
+        Some(schema) => value.resolve(schema),
+        None => Ok(value),
+    }
+}
+
 pub struct GenericSingleObjectReader {
     write_schema: ResolvedOwnedSchema,
     expected_header: [u8; 10],
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3842afd97..0aa0eed2c 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -257,7 +257,7 @@ impl Name {
         Ok(Self { name, namespace })
     }
 
-    fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
+    pub(crate) fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
         let caps = SCHEMA_NAME_R
             .captures(name)
             .ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
@@ -396,9 +396,10 @@ impl Serialize for Alias {
     }
 }
 
+#[derive(Debug)]
 pub struct ResolvedSchema<'s> {
     names_ref: NamesRef<'s>,
-    root_schema: &'s Schema,
+    schemata: Vec<&'s Schema>,
 }
 
 impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
@@ -408,71 +409,84 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
         let names = HashMap::new();
         let mut rs = ResolvedSchema {
             names_ref: names,
-            root_schema: schema,
+            schemata: vec![schema],
         };
-        Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+        Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
+        Ok(rs)
+    }
+}
+
+impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
+    type Error = Error;
+
+    fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
+        let names = HashMap::new();
+        let mut rs = ResolvedSchema {
+            names_ref: names,
+            schemata,
+        };
+        Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
         Ok(rs)
     }
 }
 
 impl<'s> ResolvedSchema<'s> {
-    pub fn get_root_schema(&self) -> &'s Schema {
-        self.root_schema
+    pub fn get_schemata(&self) -> Vec<&'s Schema> {
+        self.schemata.clone()
     }
+
     pub fn get_names(&self) -> &NamesRef<'s> {
         &self.names_ref
     }
 
     fn from_internal(
-        schema: &'s Schema,
+        schemata: Vec<&'s Schema>,
         names_ref: &mut NamesRef<'s>,
         enclosing_namespace: &Namespace,
     ) -> AvroResult<()> {
-        match schema {
-            Schema::Array(schema) | Schema::Map(schema) => {
-                Self::from_internal(schema, names_ref, enclosing_namespace)
-            }
-            Schema::Union(UnionSchema { schemas, .. }) => {
-                for schema in schemas {
-                    Self::from_internal(schema, names_ref, enclosing_namespace)?
+        for schema in schemata {
+            match schema {
+                Schema::Array(schema) | Schema::Map(schema) => {
+                    Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
                 }
-                Ok(())
-            }
-            Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
-                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
-                if names_ref
-                    .insert(fully_qualified_name.clone(), schema)
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
-                    Ok(())
+                Schema::Union(UnionSchema { schemas, .. }) => {
+                    for schema in schemas {
+                        Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
+                    }
                 }
-            }
-            Schema::Record { name, fields, .. } => {
-                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
-                if names_ref
-                    .insert(fully_qualified_name.clone(), schema)
-                    .is_some()
-                {
-                    Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
-                } else {
-                    let record_namespace = fully_qualified_name.namespace;
-                    for field in fields {
-                        Self::from_internal(&field.schema, names_ref, &record_namespace)?
+                Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+                    if names_ref
+                        .insert(fully_qualified_name.clone(), schema)
+                        .is_some()
+                    {
+                        return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
                     }
-                    Ok(())
                 }
+                Schema::Record { name, fields, .. } => {
+                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+                    if names_ref
+                        .insert(fully_qualified_name.clone(), schema)
+                        .is_some()
+                    {
+                        return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
+                    } else {
+                        let record_namespace = fully_qualified_name.namespace;
+                        for field in fields {
+                            Self::from_internal(vec![&field.schema], names_ref, &record_namespace)?
+                        }
+                    }
+                }
+                Schema::Ref { name } => {
+                    let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+                    if names_ref.get(&fully_qualified_name).is_none() {
+                        return Err(Error::SchemaResolutionError(fully_qualified_name));
+                    }
+                }
+                _ => (),
             }
-            Schema::Ref { name } => {
-                let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
-                names_ref
-                    .get(&fully_qualified_name)
-                    .map(|_| ())
-                    .ok_or(Error::SchemaResolutionError(fully_qualified_name))
-            }
-            _ => Ok(()),
         }
+        Ok(())
     }
 }
 
@@ -824,6 +838,18 @@ impl Schema {
         parser.parse(value, &None)
     }
 
+    /// Parses an Avro schema from JSON.
+    /// Any `Schema::Ref`s must be known in the `names` map.
+    pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
+        let mut parser = Parser {
+            input_schemas: HashMap::with_capacity(1),
+            resolving_schemas: Names::default(),
+            input_order: Vec::with_capacity(1),
+            parsed_schemas: names,
+        };
+        parser.parse(value, &None)
+    }
+
     /// Returns the custom attributes (metadata) if the schema supports them.
     pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
         match self {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 60f94ecf8..ee322b331 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -339,19 +339,26 @@ impl Value {
     /// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
     /// for the full set of rules of schema validation.
     pub fn validate(&self, schema: &Schema) -> bool {
-        let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse");
-        let enclosing_namespace = schema.namespace();
-
-        match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
-            Some(error_msg) => {
-                error!(
-                    "Invalid value: {:?} for schema: {:?}. Reason: {}",
-                    self, schema, error_msg
-                );
-                false
+        self.validate_schemata(vec![schema])
+    }
+
+    pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
+        let rs = ResolvedSchema::try_from(schemata.clone())
+            .expect("Schemata didn't successfully resolve");
+        schemata.iter().any(|schema| {
+            let enclosing_namespace = schema.namespace();
+
+            match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
+                Some(error_msg) => {
+                    error!(
+                        "Invalid value: {:?} for schema: {:?}. Reason: {}",
+                        self, schema, error_msg
+                    );
+                    false
+                }
+                None => true,
             }
-            None => true,
-        }
+        })
     }
 
     fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 0c8e4661f..33653e29b 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -75,6 +75,25 @@ impl<'a, W: Write> Writer<'a, W> {
         w
     }
 
+    /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
+    /// `io::Write` trait to write to.
+    /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
+    /// be provided in `schemata`.
+    pub fn with_schemata(
+        schema: &'a Schema,
+        schemata: Vec<&'a Schema>,
+        writer: W,
+        codec: Codec,
+    ) -> Self {
+        let mut w = Self::builder()
+            .schema(schema)
+            .writer(writer)
+            .codec(codec)
+            .build();
+        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
+        w
+    }
+
     /// Creates a `Writer` that will append values to already populated
     /// `std::io::Write` using the provided `marker`
     /// No compression `Codec` will be used.
@@ -101,6 +120,26 @@ impl<'a, W: Write> Writer<'a, W> {
         w
     }
 
+    /// Creates a `Writer` that will append values to already populated
+    /// `std::io::Write` using the provided `marker`
+    pub fn append_to_with_codec_schemata(
+        schema: &'a Schema,
+        schemata: Vec<&'a Schema>,
+        writer: W,
+        codec: Codec,
+        marker: [u8; 16],
+    ) -> Self {
+        let mut w = Self::builder()
+            .schema(schema)
+            .writer(writer)
+            .codec(codec)
+            .marker(marker)
+            .build();
+        w.has_header = true;
+        w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
+        w
+    }
+
     /// Get a reference to the `Schema` associated to a `Writer`.
     pub fn schema(&self) -> &'a Schema {
         self.schema
@@ -134,7 +173,7 @@ impl<'a, W: Write> Writer<'a, W> {
         // Lazy init for users using the builder pattern with error throwing
         match self.resolved_schema {
             Some(ref rs) => {
-                write_value_ref_resolved(rs, value, &mut self.buffer)?;
+                write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
                 self.num_values += 1;
 
                 if self.buffer.len() >= self.block_size {
@@ -376,6 +415,22 @@ fn write_avro_datum<T: Into<Value>>(
     Ok(())
 }
 
+fn write_avro_datum_schemata<T: Into<Value>>(
+    schema: &Schema,
+    schemata: Vec<&Schema>,
+    value: T,
+    buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+    let avro = value.into();
+    let rs = ResolvedSchema::try_from(schemata)?;
+    let names = rs.get_names();
+    let enclosing_namespace = schema.namespace();
+    if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
+        return Err(Error::Validation);
+    }
+    encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
+}
+
 /// Writer that encodes messages according to the single object encoding v1 spec
 /// Uses an API similar to the current File Writer
 /// Writes all object bytes at once, and drains internal buffer
@@ -484,26 +539,21 @@ where
 }
 
 fn write_value_ref_resolved(
+    schema: &Schema,
     resolved_schema: &ResolvedSchema,
     value: &Value,
     buffer: &mut Vec<u8>,
 ) -> AvroResult<()> {
-    let root_schema = resolved_schema.get_root_schema();
-    if let Some(err) = value.validate_internal(
-        root_schema,
-        resolved_schema.get_names(),
-        &root_schema.namespace(),
-    ) {
-        return Err(Error::ValidationWithReason(err));
+    match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
+        Some(err) => Err(Error::ValidationWithReason(err)),
+        None => encode_internal(
+            value,
+            schema,
+            resolved_schema.get_names(),
+            &schema.namespace(),
+            buffer,
+        ),
     }
-    encode_internal(
-        value,
-        root_schema,
-        resolved_schema.get_names(),
-        &root_schema.namespace(),
-        buffer,
-    )?;
-    Ok(())
 }
 
 fn write_value_ref_owned_resolved(
@@ -541,6 +591,20 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve
     Ok(buffer)
 }
 
+/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
+/// performing schema validation.
+/// If the provided `schema` is incomplete then its dependencies must be
+/// provided in `schemata`
+pub fn to_avro_datum_schemata<T: Into<Value>>(
+    schema: &Schema,
+    schemata: Vec<&Schema>,
+    value: T,
+) -> AvroResult<Vec<u8>> {
+    let mut buffer = Vec::new();
+    write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
+    Ok(buffer)
+}
+
 #[cfg(not(target_arch = "wasm32"))]
 fn generate_sync_marker() -> [u8; 16] {
     let mut marker = [0_u8; 16];
diff --git a/lang/rust/avro/tests/to_from_avro_datum_schemata.rs b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
new file mode 100644
index 000000000..d05025ab7
--- /dev/null
+++ b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::{
+    from_avro_datum_schemata, to_avro_datum_schemata, types::Value, Codec, Reader, Schema, Writer,
+};
+use apache_avro_test_helper::init;
+
+static SCHEMA_A_STR: &str = r#"{
+        "name": "A",
+        "type": "record",
+        "fields": [
+            {"name": "field_a", "type": "float"}
+        ]
+    }"#;
+
+static SCHEMA_B_STR: &str = r#"{
+        "name": "B",
+        "type": "record",
+        "fields": [
+            {"name": "field_b", "type": "A"}
+        ]
+    }"#;
+
+#[test]
+fn test_avro_3683_multiple_schemata_to_from_avro_datum() {
+    init();
+
+    let record: Value = Value::Record(vec![(
+        String::from("field_b"),
+        Value::Record(vec![(String::from("field_a"), Value::Float(1.0))]),
+    )]);
+
+    let schemata: Vec<Schema> = Schema::parse_list(&[SCHEMA_A_STR, SCHEMA_B_STR]).unwrap();
+    let schemata: Vec<&Schema> = schemata.iter().collect();
+
+    // this is the Schema we want to use for write/read
+    let schema_b = schemata[1];
+    let expected: Vec<u8> = vec![0, 0, 128, 63];
+    let actual = to_avro_datum_schemata(schema_b, schemata.clone(), record.clone()).unwrap();
+    assert_eq!(actual, expected);
+
+    let value = from_avro_datum_schemata(schema_b, schemata, &mut actual.as_slice(), None).unwrap();
+    assert_eq!(value, record);
+}
+
+#[test]
+fn test_avro_3683_multiple_schemata_writer_reader() {
+    init();
+
+    let record: Value = Value::Record(vec![(
+        String::from("field_b"),
+        Value::Record(vec![(String::from("field_a"), Value::Float(1.0))]),
+    )]);
+
+    let schemata: Vec<Schema> = Schema::parse_list(&[SCHEMA_A_STR, SCHEMA_B_STR]).unwrap();
+    let schemata: Vec<&Schema> = schemata.iter().collect();
+
+    // this is the Schema we want to use for write/read
+    let schema_b = schemata[1];
+    let mut output: Vec<u8> = Vec::new();
+
+    let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut output, Codec::Null);
+    writer.append(record.clone()).unwrap();
+    writer.flush().unwrap();
+
+    let reader = Reader::with_schemata(schema_b, schemata, output.as_slice()).unwrap();
+    let value = reader.into_iter().next().unwrap().unwrap();
+    assert_eq!(value, record);
+}