You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/12/15 13:12:42 UTC

[avro] branch avro-3683-multiple-schemas updated: AVRO-3683: WIP Add support for reading

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/avro-3683-multiple-schemas by this push:
     new 66ab52771 AVRO-3683: WIP Add support for reading
66ab52771 is described below

commit 66ab5277194d6c7e42f8a0c67afa43460ebfc488
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Dec 15 15:11:53 2022 +0200

    AVRO-3683: WIP Add support for reading
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/decode.rs                       |  5 ++
 lang/rust/avro/src/lib.rs                          |  4 +-
 lang/rust/avro/src/reader.rs                       | 15 +++++
 lang/rust/avro/src/writer.rs                       | 45 ++-------------
 .../rust/avro/tests/to_from_avro_datum_schemata.rs | 65 ++++++++++++++++++++++
 5 files changed, 92 insertions(+), 42 deletions(-)

diff --git a/lang/rust/avro/src/decode.rs b/lang/rust/avro/src/decode.rs
index 4f9e7e945..88bb0faf6 100644
--- a/lang/rust/avro/src/decode.rs
+++ b/lang/rust/avro/src/decode.rs
@@ -31,6 +31,7 @@ use std::{
     str::FromStr,
 };
 use uuid::Uuid;
+use crate::schema::NamesRef;
 
 #[inline]
 fn decode_long<R: Read>(reader: &mut R) -> AvroResult<Value> {
@@ -73,6 +74,10 @@ pub fn decode<R: Read>(schema: &Schema, reader: &mut R) -> AvroResult<Value> {
     decode_internal(schema, rs.get_names(), &None, reader)
 }
 
+pub fn decode_schemata<R: Read>(schema: &Schema, names: &NamesRef, reader: &mut R) -> AvroResult<Value> {
+    decode_internal(schema, names, &None, reader)
+}
+
 pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
     schema: &Schema,
     names: &HashMap<Name, S>,
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index a6b06d74a..0fed80d3b 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -743,12 +743,12 @@ pub use decimal::Decimal;
 pub use duration::{Days, Duration, Millis, Months};
 pub use error::Error;
 pub use reader::{
-    from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
+    from_avro_datum, from_avro_datum_schemata, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
 };
 pub use schema::{AvroSchema, Schema};
 pub use ser::to_value;
 pub use util::max_allocation_bytes;
-pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
+pub use writer::{to_avro_datum, to_avro_datum_schemata, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
 
 #[cfg(feature = "derive")]
 pub use apache_avro_derive::*;
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index 3c8ea09b1..c2179efe0 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -33,6 +33,8 @@ use std::{
     marker::PhantomData,
     str::FromStr,
 };
+use crate::decode::decode_schemata;
+use crate::schema::ResolvedSchema;
 
 // Internal Block reader.
 #[derive(Debug, Clone)]
@@ -368,6 +370,19 @@ pub fn from_avro_datum<R: Read>(
     }
 }
 
+pub fn from_avro_datum_schemata<R: Read>(
+    writer_schemata: &[&Schema],
+    reader: &mut R,
+    reader_schema: Option<&Schema>,
+) -> AvroResult<Value> {
+    let rs = ResolvedSchema::try_from(writer_schemata)?;
+    let value = decode_schemata(writer_schemata[0], rs.get_names(), reader)?;
+    match reader_schema {
+        Some(schema) => value.resolve(schema),
+        None => Ok(value),
+    }
+}
+
 pub struct GenericSingleObjectReader {
     write_schema: ResolvedOwnedSchema,
     expected_header: [u8; 10],
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 2dba23e33..fa833eb21 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -384,12 +384,14 @@ fn write_avro_datum_schemata<T: Into<Value>>(
 ) -> AvroResult<()> {
     let avro = value.into();
     let rs = ResolvedSchema::try_from(schemata)?;
+    let names = rs.get_names();
     for schema in schemata {
+        let enclosing_namespace = schema.namespace();
         if avro
-            .validate_internal(schema, rs.get_names(), &schema.namespace())
+            .validate_internal(schema, names, &enclosing_namespace)
             .is_none()
         {
-            encode_internal(&avro, schema, rs.get_names(), &schema.namespace(), buffer)?;
+            encode_internal(&avro, schema, names, &enclosing_namespace, buffer)?;
             return Ok(());
         }
     }
@@ -596,13 +598,7 @@ fn generate_sync_marker() -> [u8; 16] {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{
-        decimal::Decimal,
-        duration::{Days, Duration, Millis, Months},
-        schema::Name,
-        types::Record,
-        util::zig_i64,
-    };
+    use crate::{decimal::Decimal, duration::{Days, Duration, Millis, Months}, schema::Name, types::Record, util::zig_i64};
     use pretty_assertions::assert_eq;
     use serde::{Deserialize, Serialize};
 
@@ -1274,35 +1270,4 @@ mod tests {
         assert_eq!(buf1, buf2);
         assert_eq!(buf1, buf3);
     }
-
-    #[test]
-    fn test_avro_3683_multiple_schemata_to_avro_datum() {
-        let schema_a_str = r#"{
-        "name": "A",
-        "type": "record",
-        "fields": [
-            {"name": "field_a", "type": "float"}
-        ]
-    }"#;
-        let schema_b_str = r#"{
-        "name": "B",
-        "type": "record",
-        "fields": [
-            {"name": "field_b", "type": "A"}
-        ]
-    }"#;
-
-        let schemata: Vec<Schema> = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
-        let schemata: Vec<&Schema> = schemata.iter().collect();
-        let record = Value::Record(vec![(
-            "field_b".into(),
-            Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
-        )]);
-
-        let expected: Vec<u8> = vec![0, 0, 128, 63];
-        assert_eq!(
-            to_avro_datum_schemata(&schemata.as_slice(), record).unwrap(),
-            expected
-        );
-    }
 }
diff --git a/lang/rust/avro/tests/to_from_avro_datum_schemata.rs b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
new file mode 100644
index 000000000..dda4915b8
--- /dev/null
+++ b/lang/rust/avro/tests/to_from_avro_datum_schemata.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::{
+    to_avro_datum_schemata, from_avro_datum_schemata,
+    types::Value,
+    Schema,
+};
+use apache_avro_test_helper::init;
+
+
+#[test]
+fn test_avro_3683_multiple_schemata_to_avro_datum() {
+    init();
+
+    let schema_a_str = r#"{
+        "name": "A",
+        "type": "record",
+        "fields": [
+            {"name": "field_a", "type": "float"}
+        ]
+    }"#;
+    let schema_b_str = r#"{
+        "name": "B",
+        "type": "record",
+        "fields": [
+            {"name": "field_b", "type": "A"}
+        ]
+    }"#;
+
+    let schemata: Vec<Schema> = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
+    let schemata: Vec<&Schema> = schemata.iter().collect();
+    let record = Value::Record(vec![(
+        "field_b".into(),
+        Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
+    )]);
+
+    let expected: Vec<u8> = vec![0, 0, 128, 63];
+    let actual = to_avro_datum_schemata(&schemata.as_slice(), record.clone()).unwrap();
+    assert_eq!(
+        actual,
+        expected
+    );
+
+    let mut bytes = actual.as_slice();
+    let value = from_avro_datum_schemata(&schemata.as_slice(), &mut bytes, None).unwrap();
+    assert_eq!(
+        value,
+        record
+    );
+}