You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/12/15 13:12:42 UTC
[avro] branch avro-3683-multiple-schemas updated: AVRO-3683: WIP Add support for reading
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/avro-3683-multiple-schemas by this push:
new 66ab52771 AVRO-3683: WIP Add support for reading
66ab52771 is described below
commit 66ab5277194d6c7e42f8a0c67afa43460ebfc488
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Dec 15 15:11:53 2022 +0200
AVRO-3683: WIP Add support for reading
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/avro/src/decode.rs | 5 ++
lang/rust/avro/src/lib.rs | 4 +-
lang/rust/avro/src/reader.rs | 15 +++++
lang/rust/avro/src/writer.rs | 45 ++-------------
.../rust/avro/tests/to_from_avro_datum_schemata.rs | 65 ++++++++++++++++++++++
5 files changed, 92 insertions(+), 42 deletions(-)
diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 4f9e7e945..88bb0faf6 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -31,6 +31,7 @@ use std::{
str::FromStr,
};
use uuid::Uuid;
+use crate::schema::NamesRef;
#[inline]
fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
@@ -73,6 +74,10 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
decode_internal(schema, rs.get_names(), &None, reader)
}
+pub fn decode_schemata<R: Read>(schema: &Schema, names: &NamesRef, reader: &mut R) -> AvroResult<Value> {
+ decode_internal(schema, names, &None, reader)
+}
+
pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
schema: &Schema,
names: &HashMap<Name, S>,
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index a6b06d74a..0fed80d3b 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -743,12 +743,12 @@ pub use decimal::Decimal;
pub use duration::{Days, Duration, Millis, Months};
pub use error::Error;
pub use reader::{
- from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
+ from_avro_datum, from_avro_datum_schemata, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
};
pub use schema::{AvroSchema, Schema};
pub use ser::to_value;
pub use util::max_allocation_bytes;
-pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
+pub use writer::{to_avro_datum, to_avro_datum_schemata, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
#[cfg(feature = "derive")]
pub use apache_avro_derive::*;
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index 3c8ea09b1..c2179efe0 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -33,6 +33,8 @@ use std::{
marker::PhantomData,
str::FromStr,
};
+use crate::decode::decode_schemata;
+use crate::schema::ResolvedSchema;
// Internal Block reader.
#[derive(Debug, Clone)]
@@ -368,6 +370,19 @@ pub fn from_avro_datum<R: Read>(
}
}
+pub fn from_avro_datum_schemata<R: Read>(
+ writer_schemata: &[&Schema],
+ reader: &mut R,
+ reader_schema: Option<&Schema>,
+) -> AvroResult<Value> {
+ let rs = ResolvedSchema::try_from(writer_schemata)?;
+ let value = decode_schemata(writer_schemata[0], rs.get_names(), reader)?;
+ match reader_schema {
+ Some(schema) => value.resolve(schema),
+ None => Ok(value),
+ }
+}
+
pub struct GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema,
expected_header: [u8; 10],
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 2dba23e33..fa833eb21 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -384,12 +384,14 @@ fn write_avro_datum_schemata<T: Into<Value>>(
) -> AvroResult<()> {
let avro = value.into();
let rs = ResolvedSchema::try_from(schemata)?;
+ let names = rs.get_names();
for schema in schemata {
+ let enclosing_namespace = schema.namespace();
if avro
- .validate_internal(schema, rs.get_names(), &schema.namespace())
+ .validate_internal(schema, names, &enclosing_namespace)
.is_none()
{
- encode_internal(&avro, schema, rs.get_names(), &schema.namespace(), buffer)?;
+ encode_internal(&avro, schema, names, &enclosing_namespace, buffer)?;
return Ok(());
}
}
@@ -596,13 +598,7 @@ fn generate_sync_marker() -> [u8; 16] {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{
- decimal::Decimal,
- duration::{Days, Duration, Millis, Months},
- schema::Name,
- types::Record,
- util::zig_i64,
- };
+ use crate::{decimal::Decimal, duration::{Days, Duration, Millis, Months}, schema::Name, types::Record, util::zig_i64};
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
@@ -1274,35 +1270,4 @@ mod tests {
assert_eq!(buf1, buf2);
assert_eq!(buf1, buf3);
}
-
- #[test]
- fn test_avro_3683_multiple_schemata_to_avro_datum() {
- let schema_a_str = r#"{
- "name": "A",
- "type": "record",
- "fields": [
- {"name": "field_a", "type": "float"}
- ]
- }"#;
- let schema_b_str = r#"{
- "name": "B",
- "type": "record",
- "fields": [
- {"name": "field_b", "type": "A"}
- ]
- }"#;
-
- let schemata: Vec<Schema> = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
- let schemata: Vec<&Schema> = schemata.iter().collect();
- let record = Value::Record(vec![(
- "field_b".into(),
- Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
- )]);
-
- let expected: Vec<u8> = vec![0, 0, 128, 63];
- assert_eq!(
- to_avro_datum_schemata(&schemata.as_slice(), record).unwrap(),
- expected
- );
- }
}
diff --git a/lang/rust/avro/tests/to_from_avro_datum_schemata.rs b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
new file mode 100644
index 000000000..dda4915b8
--- /dev/null
+++ b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::{
+ to_avro_datum_schemata, from_avro_datum_schemata,
+ types::Value,
+ Schema,
+};
+use apache_avro_test_helper::init;
+
+
+#[test]
+fn test_avro_3683_multiple_schemata_to_avro_datum() {
+ init();
+
+ let schema_a_str = r#"{
+ "name": "A",
+ "type": "record",
+ "fields": [
+ {"name": "field_a", "type": "float"}
+ ]
+ }"#;
+ let schema_b_str = r#"{
+ "name": "B",
+ "type": "record",
+ "fields": [
+ {"name": "field_b", "type": "A"}
+ ]
+ }"#;
+
+ let schemata: Vec<Schema> = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
+ let schemata: Vec<&Schema> = schemata.iter().collect();
+ let record = Value::Record(vec![(
+ "field_b".into(),
+ Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
+ )]);
+
+ let expected: Vec<u8> = vec![0, 0, 128, 63];
+ let actual = to_avro_datum_schemata(&schemata.as_slice(), record.clone()).unwrap();
+ assert_eq!(
+ actual,
+ expected
+ );
+
+ let mut bytes = actual.as_slice();
+ let value = from_avro_datum_schemata(&schemata.as_slice(), &mut bytes, None).unwrap();
+ assert_eq!(
+ value,
+ record
+ );
+}