You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2023/03/13 07:10:00 UTC
[avro] branch master updated: AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new b8b83b72f AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)
b8b83b72f is described below
commit b8b83b72f7184cf1b388fc20331d11eabbd93e06
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Mon Mar 13 09:09:53 2023 +0200
AVRO-3683: [Rust] Read/Write with multiple schemas (#2014)
* AVRO-3683: Add support for using multiple schemata for resolve/validate/write
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP compiles
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP compiles and all tests pass
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP Add support for reading
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP
Use a main schema and pass all other schemata for resolution.
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: Formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: WIP
Add support for multiple schemata in Reader/Writer APIs
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3646: Formatting
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: Use Vec instead of slice reference for schemata
It is much easier to deal with.
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: Fix the resolving of the writer schema when reading an Avro file
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* AVRO-3683: Cleaup
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---------
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/avro/src/lib.rs | 8 +-
lang/rust/avro/src/reader.rs | 79 ++++++++++++--
lang/rust/avro/src/schema.rs | 118 +++++++++++++--------
lang/rust/avro/src/types.rs | 31 +++---
lang/rust/avro/src/writer.rs | 96 ++++++++++++++---
.../rust/avro/tests/to_from_avro_datum_schemata.rs | 84 +++++++++++++++
6 files changed, 330 insertions(+), 86 deletions(-)
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index a6b06d74a..2c6f46c07 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -743,12 +743,16 @@ pub use decimal::Decimal;
pub use duration::{Days, Duration, Millis, Months};
pub use error::Error;
pub use reader::{
- from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
+ from_avro_datum, from_avro_datum_schemata, read_marker, GenericSingleObjectReader, Reader,
+ SpecificSingleObjectReader,
};
pub use schema::{AvroSchema, Schema};
pub use ser::to_value;
pub use util::max_allocation_bytes;
-pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
+pub use writer::{
+ to_avro_datum, to_avro_datum_schemata, GenericSingleObjectWriter, SpecificSingleObjectWriter,
+ Writer,
+};
#[cfg(feature = "derive")]
pub use apache_avro_derive::*;
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index 442f13075..3489ccfb1 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -20,7 +20,7 @@ use crate::{
decode::{decode, decode_internal},
from_value,
rabin::Rabin,
- schema::{AvroSchema, ResolvedOwnedSchema, Schema},
+ schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema},
types::Value,
util, AvroResult, Codec, Error,
};
@@ -36,7 +36,7 @@ use std::{
// Internal Block reader.
#[derive(Debug, Clone)]
-struct Block<R> {
+struct Block<'r, R> {
reader: R,
// Internal buffering to reduce allocation.
buf: Vec<u8>,
@@ -46,15 +46,17 @@ struct Block<R> {
marker: [u8; 16],
codec: Codec,
writer_schema: Schema,
+ schemata: Vec<&'r Schema>,
user_metadata: HashMap<String, Vec<u8>>,
}
-impl<R: Read> Block<R> {
- fn new(reader: R) -> AvroResult<Block<R>> {
+impl<'r, R: Read> Block<'r, R> {
+ fn new(reader: R, schemata: Vec<&'r Schema>) -> AvroResult<Block<R>> {
let mut block = Block {
reader,
codec: Codec::Null,
writer_schema: Schema::Null,
+ schemata,
buf: vec![],
buf_idx: 0,
message_count: 0,
@@ -178,7 +180,13 @@ impl<R: Read> Block<R> {
let mut block_bytes = &self.buf[self.buf_idx..];
let b_original = block_bytes.len();
- let item = from_avro_datum(&self.writer_schema, &mut block_bytes, read_schema)?;
+ let schemata = if self.schemata.is_empty() {
+ vec![&self.writer_schema]
+ } else {
+ self.schemata.clone()
+ };
+ let item =
+ from_avro_datum_schemata(&self.writer_schema, schemata, &mut block_bytes, read_schema)?;
if b_original == block_bytes.len() {
// from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
return Err(Error::ReadBlock);
@@ -189,7 +197,7 @@ impl<R: Read> Block<R> {
}
fn read_writer_schema(&mut self, metadata: &HashMap<String, Value>) -> AvroResult<()> {
- let json = metadata
+ let json: serde_json::Value = metadata
.get("avro.schema")
.and_then(|bytes| {
if let Value::Bytes(ref bytes) = *bytes {
@@ -199,7 +207,17 @@ impl<R: Read> Block<R> {
}
})
.ok_or(Error::GetAvroSchemaFromMap)?;
- self.writer_schema = Schema::parse(&json)?;
+ if !self.schemata.is_empty() {
+ let rs = ResolvedSchema::try_from(self.schemata.clone())?;
+ let names: Names = rs
+ .get_names()
+ .iter()
+ .map(|(name, schema)| (name.clone(), (*schema).clone()))
+ .collect();
+ self.writer_schema = Schema::parse_with_names(&json, names)?;
+ } else {
+ self.writer_schema = Schema::parse(&json)?;
+ }
Ok(())
}
@@ -261,7 +279,7 @@ fn read_codec(metadata: &HashMap<String, Value>) -> AvroResult<Codec> {
/// }
/// ```
pub struct Reader<'a, R> {
- block: Block<R>,
+ block: Block<'a, R>,
reader_schema: Option<&'a Schema>,
errored: bool,
should_resolve_schema: bool,
@@ -273,7 +291,7 @@ impl<'a, R: Read> Reader<'a, R> {
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
- let block = Block::new(reader)?;
+ let block = Block::new(reader, vec![])?;
let reader = Reader {
block,
reader_schema: None,
@@ -288,7 +306,28 @@ impl<'a, R: Read> Reader<'a, R> {
///
/// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
- let block = Block::new(reader)?;
+ let block = Block::new(reader, vec![schema])?;
+ let mut reader = Reader {
+ block,
+ reader_schema: Some(schema),
+ errored: false,
+ should_resolve_schema: false,
+ };
+ // Check if the reader and writer schemas disagree.
+ reader.should_resolve_schema = reader.writer_schema() != schema;
+ Ok(reader)
+ }
+
+ /// Creates a `Reader` given a reader `Schema` and something implementing the `io::Read` trait
+ /// to read from.
+ ///
+ /// **NOTE** The avro header is going to be read automatically upon creation of the `Reader`.
+ pub fn with_schemata(
+ schema: &'a Schema,
+ schemata: Vec<&'a Schema>,
+ reader: R,
+ ) -> AvroResult<Reader<'a, R>> {
+ let block = Block::new(reader, schemata)?;
let mut reader = Reader {
block,
reader_schema: Some(schema),
@@ -368,6 +407,26 @@ pub fn from_avro_datum<R: Read>(
}
}
+/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
+/// to read from.
+/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
+/// schemata to resolve any dependencies.
+///
+/// In case a reader `Schema` is provided, schema resolution will also be performed.
+pub fn from_avro_datum_schemata<R: Read>(
+ writer_schema: &Schema,
+ schemata: Vec<&Schema>,
+ reader: &mut R,
+ reader_schema: Option<&Schema>,
+) -> AvroResult<Value> {
+ let rs = ResolvedSchema::try_from(schemata)?;
+ let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
+ match reader_schema {
+ Some(schema) => value.resolve(schema),
+ None => Ok(value),
+ }
+}
+
pub struct GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema,
expected_header: [u8; 10],
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 3842afd97..0aa0eed2c 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -257,7 +257,7 @@ impl Name {
Ok(Self { name, namespace })
}
- fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
+ pub(crate) fn get_name_and_namespace(name: &str) -> AvroResult<(String, Namespace)> {
let caps = SCHEMA_NAME_R
.captures(name)
.ok_or_else(|| Error::InvalidSchemaName(name.to_string(), SCHEMA_NAME_R.as_str()))?;
@@ -396,9 +396,10 @@ impl Serialize for Alias {
}
}
+#[derive(Debug)]
pub struct ResolvedSchema<'s> {
names_ref: NamesRef<'s>,
- root_schema: &'s Schema,
+ schemata: Vec<&'s Schema>,
}
impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
@@ -408,71 +409,84 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
let names = HashMap::new();
let mut rs = ResolvedSchema {
names_ref: names,
- root_schema: schema,
+ schemata: vec![schema],
};
- Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?;
+ Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
+ Ok(rs)
+ }
+}
+
+impl<'s> TryFrom<Vec<&'s Schema>> for ResolvedSchema<'s> {
+ type Error = Error;
+
+ fn try_from(schemata: Vec<&'s Schema>) -> AvroResult<Self> {
+ let names = HashMap::new();
+ let mut rs = ResolvedSchema {
+ names_ref: names,
+ schemata,
+ };
+ Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
Ok(rs)
}
}
impl<'s> ResolvedSchema<'s> {
- pub fn get_root_schema(&self) -> &'s Schema {
- self.root_schema
+ pub fn get_schemata(&self) -> Vec<&'s Schema> {
+ self.schemata.clone()
}
+
pub fn get_names(&self) -> &NamesRef<'s> {
&self.names_ref
}
fn from_internal(
- schema: &'s Schema,
+ schemata: Vec<&'s Schema>,
names_ref: &mut NamesRef<'s>,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
- match schema {
- Schema::Array(schema) | Schema::Map(schema) => {
- Self::from_internal(schema, names_ref, enclosing_namespace)
- }
- Schema::Union(UnionSchema { schemas, .. }) => {
- for schema in schemas {
- Self::from_internal(schema, names_ref, enclosing_namespace)?
+ for schema in schemata {
+ match schema {
+ Schema::Array(schema) | Schema::Map(schema) => {
+ Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
}
- Ok(())
- }
- Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
- let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
- if names_ref
- .insert(fully_qualified_name.clone(), schema)
- .is_some()
- {
- Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
- } else {
- Ok(())
+ Schema::Union(UnionSchema { schemas, .. }) => {
+ for schema in schemas {
+ Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
+ }
}
- }
- Schema::Record { name, fields, .. } => {
- let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
- if names_ref
- .insert(fully_qualified_name.clone(), schema)
- .is_some()
- {
- Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
- } else {
- let record_namespace = fully_qualified_name.namespace;
- for field in fields {
- Self::from_internal(&field.schema, names_ref, &record_namespace)?
+ Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
+ let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
}
- Ok(())
}
+ Schema::Record { name, fields, .. } => {
+ let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+ if names_ref
+ .insert(fully_qualified_name.clone(), schema)
+ .is_some()
+ {
+ return Err(Error::AmbiguousSchemaDefinition(fully_qualified_name));
+ } else {
+ let record_namespace = fully_qualified_name.namespace;
+ for field in fields {
+ Self::from_internal(vec![&field.schema], names_ref, &record_namespace)?
+ }
+ }
+ }
+ Schema::Ref { name } => {
+ let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
+ if names_ref.get(&fully_qualified_name).is_none() {
+ return Err(Error::SchemaResolutionError(fully_qualified_name));
+ }
+ }
+ _ => (),
}
- Schema::Ref { name } => {
- let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
- names_ref
- .get(&fully_qualified_name)
- .map(|_| ())
- .ok_or(Error::SchemaResolutionError(fully_qualified_name))
- }
- _ => Ok(()),
}
+ Ok(())
}
}
@@ -824,6 +838,18 @@ impl Schema {
parser.parse(value, &None)
}
+ /// Parses an Avro schema from JSON.
+ /// Any `Schema::Ref`s must be known in the `names` map.
+ pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> {
+ let mut parser = Parser {
+ input_schemas: HashMap::with_capacity(1),
+ resolving_schemas: Names::default(),
+ input_order: Vec::with_capacity(1),
+ parsed_schemas: names,
+ };
+ parser.parse(value, &None)
+ }
+
/// Returns the custom attributes (metadata) if the schema supports them.
pub fn custom_attributes(&self) -> Option<&BTreeMap<String, Value>> {
match self {
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index 60f94ecf8..ee322b331 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -339,19 +339,26 @@ impl Value {
/// See the [Avro specification](https://avro.apache.org/docs/current/spec.html)
/// for the full set of rules of schema validation.
pub fn validate(&self, schema: &Schema) -> bool {
- let rs = ResolvedSchema::try_from(schema).expect("Schema didn't successfully parse");
- let enclosing_namespace = schema.namespace();
-
- match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
- Some(error_msg) => {
- error!(
- "Invalid value: {:?} for schema: {:?}. Reason: {}",
- self, schema, error_msg
- );
- false
+ self.validate_schemata(vec![schema])
+ }
+
+ pub fn validate_schemata(&self, schemata: Vec<&Schema>) -> bool {
+ let rs = ResolvedSchema::try_from(schemata.clone())
+ .expect("Schemata didn't successfully resolve");
+ schemata.iter().any(|schema| {
+ let enclosing_namespace = schema.namespace();
+
+ match self.validate_internal(schema, rs.get_names(), &enclosing_namespace) {
+ Some(error_msg) => {
+ error!(
+ "Invalid value: {:?} for schema: {:?}. Reason: {}",
+ self, schema, error_msg
+ );
+ false
+ }
+ None => true,
}
- None => true,
- }
+ })
}
fn accumulate(accumulator: Option<String>, other: Option<String>) -> Option<String> {
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 0c8e4661f..33653e29b 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -75,6 +75,25 @@ impl<'a, W: Write> Writer<'a, W> {
w
}
+ /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
+ /// `io::Write` trait to write to.
+ /// If the `schema` is incomplete, i.e. contains `Schema::Ref`s then all dependencies must
+ /// be provided in `schemata`.
+ pub fn with_schemata(
+ schema: &'a Schema,
+ schemata: Vec<&'a Schema>,
+ writer: W,
+ codec: Codec,
+ ) -> Self {
+ let mut w = Self::builder()
+ .schema(schema)
+ .writer(writer)
+ .codec(codec)
+ .build();
+ w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
+ w
+ }
+
/// Creates a `Writer` that will append values to already populated
/// `std::io::Write` using the provided `marker`
/// No compression `Codec` will be used.
@@ -101,6 +120,26 @@ impl<'a, W: Write> Writer<'a, W> {
w
}
+ /// Creates a `Writer` that will append values to already populated
+ /// `std::io::Write` using the provided `marker`
+ pub fn append_to_with_codec_schemata(
+ schema: &'a Schema,
+ schemata: Vec<&'a Schema>,
+ writer: W,
+ codec: Codec,
+ marker: [u8; 16],
+ ) -> Self {
+ let mut w = Self::builder()
+ .schema(schema)
+ .writer(writer)
+ .codec(codec)
+ .marker(marker)
+ .build();
+ w.has_header = true;
+ w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
+ w
+ }
+
/// Get a reference to the `Schema` associated to a `Writer`.
pub fn schema(&self) -> &'a Schema {
self.schema
@@ -134,7 +173,7 @@ impl<'a, W: Write> Writer<'a, W> {
// Lazy init for users using the builder pattern with error throwing
match self.resolved_schema {
Some(ref rs) => {
- write_value_ref_resolved(rs, value, &mut self.buffer)?;
+ write_value_ref_resolved(self.schema, rs, value, &mut self.buffer)?;
self.num_values += 1;
if self.buffer.len() >= self.block_size {
@@ -376,6 +415,22 @@ fn write_avro_datum<T: Into<Value>>(
Ok(())
}
+fn write_avro_datum_schemata<T: Into<Value>>(
+ schema: &Schema,
+ schemata: Vec<&Schema>,
+ value: T,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ let avro = value.into();
+ let rs = ResolvedSchema::try_from(schemata)?;
+ let names = rs.get_names();
+ let enclosing_namespace = schema.namespace();
+ if let Some(_err) = avro.validate_internal(schema, names, &enclosing_namespace) {
+ return Err(Error::Validation);
+ }
+ encode_internal(&avro, schema, names, &enclosing_namespace, buffer)
+}
+
/// Writer that encodes messages according to the single object encoding v1 spec
/// Uses an API similar to the current File Writer
/// Writes all object bytes at once, and drains internal buffer
@@ -484,26 +539,21 @@ where
}
fn write_value_ref_resolved(
+ schema: &Schema,
resolved_schema: &ResolvedSchema,
value: &Value,
buffer: &mut Vec<u8>,
) -> AvroResult<()> {
- let root_schema = resolved_schema.get_root_schema();
- if let Some(err) = value.validate_internal(
- root_schema,
- resolved_schema.get_names(),
- &root_schema.namespace(),
- ) {
- return Err(Error::ValidationWithReason(err));
+ match value.validate_internal(schema, resolved_schema.get_names(), &schema.namespace()) {
+ Some(err) => Err(Error::ValidationWithReason(err)),
+ None => encode_internal(
+ value,
+ schema,
+ resolved_schema.get_names(),
+ &schema.namespace(),
+ buffer,
+ ),
}
- encode_internal(
- value,
- root_schema,
- resolved_schema.get_names(),
- &root_schema.namespace(),
- buffer,
- )?;
- Ok(())
}
fn write_value_ref_owned_resolved(
@@ -541,6 +591,20 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve
Ok(buffer)
}
+/// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also
+/// performing schema validation.
+/// If the provided `schema` is incomplete then its dependencies must be
+/// provided in `schemata`
+pub fn to_avro_datum_schemata<T: Into<Value>>(
+ schema: &Schema,
+ schemata: Vec<&Schema>,
+ value: T,
+) -> AvroResult<Vec<u8>> {
+ let mut buffer = Vec::new();
+ write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
+ Ok(buffer)
+}
+
#[cfg(not(target_arch = "wasm32"))]
fn generate_sync_marker() -> [u8; 16] {
let mut marker = [0_u8; 16];
diff --git a/lang/rust/avro/tests/to_from_avro_datum_schemata.rs b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
new file mode 100644
index 000000000..d05025ab7
--- /dev/null
+++ b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::{
+ from_avro_datum_schemata, to_avro_datum_schemata, types::Value, Codec, Reader, Schema, Writer,
+};
+use apache_avro_test_helper::init;
+
+static SCHEMA_A_STR: &str = r#"{
+ "name": "A",
+ "type": "record",
+ "fields": [
+ {"name": "field_a", "type": "float"}
+ ]
+ }"#;
+
+static SCHEMA_B_STR: &str = r#"{
+ "name": "B",
+ "type": "record",
+ "fields": [
+ {"name": "field_b", "type": "A"}
+ ]
+ }"#;
+
+#[test]
+fn test_avro_3683_multiple_schemata_to_from_avro_datum() {
+ init();
+
+ let record: Value = Value::Record(vec![(
+ String::from("field_b"),
+ Value::Record(vec![(String::from("field_a"), Value::Float(1.0))]),
+ )]);
+
+ let schemata: Vec<Schema> = Schema::parse_list(&[SCHEMA_A_STR, SCHEMA_B_STR]).unwrap();
+ let schemata: Vec<&Schema> = schemata.iter().collect();
+
+ // this is the Schema we want to use for write/read
+ let schema_b = schemata[1];
+ let expected: Vec<u8> = vec![0, 0, 128, 63];
+ let actual = to_avro_datum_schemata(schema_b, schemata.clone(), record.clone()).unwrap();
+ assert_eq!(actual, expected);
+
+ let value = from_avro_datum_schemata(schema_b, schemata, &mut actual.as_slice(), None).unwrap();
+ assert_eq!(value, record);
+}
+
+#[test]
+fn test_avro_3683_multiple_schemata_writer_reader() {
+ init();
+
+ let record: Value = Value::Record(vec![(
+ String::from("field_b"),
+ Value::Record(vec![(String::from("field_a"), Value::Float(1.0))]),
+ )]);
+
+ let schemata: Vec<Schema> = Schema::parse_list(&[SCHEMA_A_STR, SCHEMA_B_STR]).unwrap();
+ let schemata: Vec<&Schema> = schemata.iter().collect();
+
+ // this is the Schema we want to use for write/read
+ let schema_b = schemata[1];
+ let mut output: Vec<u8> = Vec::new();
+
+ let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut output, Codec::Null);
+ writer.append(record.clone()).unwrap();
+ writer.flush().unwrap();
+
+ let reader = Reader::with_schemata(schema_b, schemata, output.as_slice()).unwrap();
+ let value = reader.into_iter().next().unwrap().unwrap();
+ assert_eq!(value, record);
+}