You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/12/14 23:28:25 UTC

[avro] branch avro-3683-multiple-schemas updated (0c2e2183d -> 4cbafabb1)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a change to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git


    from 0c2e2183d AVRO-3683: WIP
     new 2aebeab25 AVRO-3683: WIP compiles
     new 4cbafabb1 AVRO-3683: WIP compiles and all tests pass

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 lang/rust/avro/src/encode.rs | 16 ----------------
 lang/rust/avro/src/schema.rs | 33 +++++++++++++++++----------------
 lang/rust/avro/src/types.rs  |  1 +
 lang/rust/avro/src/writer.rs | 26 +++++++++++++++++---------
 4 files changed, 35 insertions(+), 41 deletions(-)


[avro] 01/02: AVRO-3683: WIP compiles

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 2aebeab25ec9028a7c9d257debaa620b868b7942
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Dec 15 01:00:31 2022 +0200

    AVRO-3683: WIP compiles
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/schema.rs | 32 ++++++++++++++++----------------
 lang/rust/avro/src/writer.rs |  5 +++--
 2 files changed, 19 insertions(+), 18 deletions(-)

diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 5784f1901..1ffb9e6ea 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -401,7 +401,7 @@ impl Serialize for Alias {
 
 pub(crate) struct ResolvedSchema<'s> {
     names_ref: NamesRef<'s>,
-    schemata: &'s [&'s Schema],
+    schemata: Vec<&'s Schema>,
 }
 
 impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
@@ -411,9 +411,9 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
         let names = HashMap::new();
         let mut rs = ResolvedSchema {
             names_ref: names,
-            schemata: &[schema],
+            schemata: vec![schema],
         };
-        Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?;
+        Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
         Ok(rs)
     }
 }
@@ -425,34 +425,35 @@ impl<'s> TryFrom<&'s [&'s Schema]> for ResolvedSchema<'s> {
         let names = HashMap::new();
         let mut rs = ResolvedSchema {
             names_ref: names,
-            schemata,
+            schemata: schemata.to_vec(),
         };
-        Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?;
+        Self::from_internal(rs.get_schemata(), &mut rs.names_ref, &None)?;
         Ok(rs)
     }
 }
 
 impl<'s> ResolvedSchema<'s> {
-    pub(crate) fn get_schemata(&self) -> &'s [&'s Schema] {
-        self.schemata
+    pub(crate) fn get_schemata(&self) -> Vec<&'s Schema> {
+        self.schemata.clone()
     }
+
     pub(crate) fn get_names(&self) -> &NamesRef<'s> {
         &self.names_ref
     }
 
     fn from_internal(
-        schemata: &'s [&'s Schema],
+        schemata: Vec<&'s Schema>,
         names_ref: &mut NamesRef<'s>,
         enclosing_namespace: &Namespace,
     ) -> AvroResult<()> {
         for schema in schemata {
             match schema {
                 Schema::Array(schema) | Schema::Map(schema) => {
-                    Self::from_internal(&[schema], names_ref, enclosing_namespace)?
+                    Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
                 }
                 Schema::Union(UnionSchema { schemas, .. }) => {
                     for schema in schemas {
-                        Self::from_internal(&[schema], names_ref, enclosing_namespace)?
+                        Self::from_internal(vec![schema], names_ref, enclosing_namespace)?
                     }
                 }
                 Schema::Enum { name, .. } | Schema::Fixed { name, .. } => {
@@ -474,18 +475,17 @@ impl<'s> ResolvedSchema<'s> {
                     } else {
                         let record_namespace = fully_qualified_name.namespace;
                         for field in fields {
-                            Self::from_internal(&[&field.schema], names_ref, &record_namespace)?
+                            Self::from_internal(vec![&field.schema], names_ref, &record_namespace)?
                         }
                     }
                 }
                 Schema::Ref { name } => {
                     let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
-                    names_ref
-                        .get(&fully_qualified_name)
-                        .map(|_| ())
-                        .ok_or_else(|| return Error::SchemaResolutionError(fully_qualified_name))
+                    if let None = names_ref.get(&fully_qualified_name) {
+                        return Err(Error::SchemaResolutionError(fully_qualified_name));
+                    }
                 }
-                _ => Ok(()),
+                _ => (),
             }
         }
         Ok(())
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index c777e31cd..6264e07de 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -1285,7 +1285,8 @@ mod tests {
         ]
     }"#;
 
-        let schemata = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
+        let schemata: Vec<Schema> = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap();
+        let schemata: Vec<&Schema> = schemata.iter().collect();
         let record = Value::Record(vec![(
             "field_b".into(),
             Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
@@ -1293,7 +1294,7 @@ mod tests {
 
         let expected: Vec<u8> = Vec::new();
         assert_eq!(
-            to_avro_datum_schemata(&schemata[..], record).unwrap(),
+            to_avro_datum_schemata(&schemata.as_slice(), record).unwrap(),
             expected
         );
     }


[avro] 02/02: AVRO-3683: WIP compiles and all tests pass

Posted by mg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3683-multiple-schemas
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 4cbafabb17cc9ac5b45c7808198471a1fab72048
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Thu Dec 15 01:27:51 2022 +0200

    AVRO-3683: WIP compiles and all tests pass
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/avro/src/encode.rs | 16 ----------------
 lang/rust/avro/src/schema.rs |  1 +
 lang/rust/avro/src/types.rs  |  1 +
 lang/rust/avro/src/writer.rs | 21 ++++++++++++++-------
 4 files changed, 16 insertions(+), 23 deletions(-)

diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 3cdb067fe..2ae48f91c 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -37,22 +37,6 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) -> AvroResul
     encode_internal(value, schema, rs.get_names(), &None, buffer)
 }
 
-pub fn encode_schemata(
-    value: &Value,
-    schemata: &[&Schema],
-    buffer: &mut Vec<u8>,
-) -> AvroResult<()> {
-    let rs = ResolvedSchema::try_from(schemata)?;
-    for schema in schemata {
-        if value.validate(schema) {
-            encode_internal(value, schema, rs.get_names(), &None, buffer)?;
-            break;
-        }
-    }
-
-    todo!("Err(None of the provided schemata matched the value)")
-}
-
 fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) {
     let bytes = s.as_ref();
     encode_long(bytes.len() as i64, buffer);
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 1ffb9e6ea..48b284eef 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -399,6 +399,7 @@ impl Serialize for Alias {
     }
 }
 
+#[derive(Debug)]
 pub(crate) struct ResolvedSchema<'s> {
     names_ref: NamesRef<'s>,
     schemata: Vec<&'s Schema>,
diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs
index ba18f7714..0755ad6e6 100644
--- a/lang/rust/avro/src/types.rs
+++ b/lang/rust/avro/src/types.rs
@@ -380,6 +380,7 @@ impl Value {
                 let name = name.fully_qualified_name(enclosing_namespace);
                 names.get(&name).map_or_else(
                     || {
+                        eprintln!("Schema not found: {:?}", &name);
                         Some(format!(
                             "Unresolved schema reference: '{:?}'. Parsed names: {:?}",
                             name,
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 6264e07de..2dba23e33 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 //! Logic handling writing in Avro format at user level.
-use crate::encode::encode_schemata;
 use crate::{
     encode::{encode, encode_internal, encode_to_vec},
     rabin::Rabin,
@@ -377,17 +376,24 @@ fn write_avro_datum<T: Into<Value>>(
     Ok(())
 }
 
+// TODO: document and add tests
 fn write_avro_datum_schemata<T: Into<Value>>(
     schemata: &[&Schema],
     value: T,
     buffer: &mut Vec<u8>,
-) -> Result<(), Error> {
+) -> AvroResult<()> {
     let avro = value.into();
-    if !avro.validate_schemata(schemata) {
-        return Err(Error::Validation);
+    let rs = ResolvedSchema::try_from(schemata)?;
+    for schema in schemata {
+        if avro
+            .validate_internal(schema, rs.get_names(), &schema.namespace())
+            .is_none()
+        {
+            encode_internal(&avro, schema, rs.get_names(), &schema.namespace(), buffer)?;
+            return Ok(());
+        }
     }
-    encode_schemata(&avro, schemata, buffer)?;
-    Ok(())
+    return Err(Error::Validation);
 }
 
 /// Writer that encodes messages according to the single object encoding v1 spec
@@ -556,6 +562,7 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve
     Ok(buffer)
 }
 
+// TODO: document and add tests
 pub fn to_avro_datum_schemata<T: Into<Value>>(
     schemata: &[&Schema],
     value: T,
@@ -1292,7 +1299,7 @@ mod tests {
             Value::Record(vec![("field_a".into(), Value::Float(1.0))]),
         )]);
 
-        let expected: Vec<u8> = Vec::new();
+        let expected: Vec<u8> = vec![0, 0, 128, 63];
         assert_eq!(
             to_avro_datum_schemata(&schemata.as_slice(), record).unwrap(),
             expected