You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/20 20:08:30 UTC

[avro] branch branch-1.11 updated: AVRO-3451: Reuse Resolved Schemas (#1608)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new 359cedb  AVRO-3451: Reuse Resolved Schemas (#1608)
359cedb is described below

commit 359cedb13dc1335766aa39d60c12ae5de330681b
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Sun Mar 20 15:07:59 2022 -0500

    AVRO-3451: Reuse Resolved Schemas (#1608)
    
    * [AVRO-3451] reuse resolved schema
    
    * AVRO-3451: Reuse Resolved Schemas
    
    Remove useless check.
    Always set `self.attempted_resolve = true`. There is no harm even if it was true already
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * [AVRO-3451] remove bool
    
    Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 66827318f0a134e8f6d4638a0787cb1b36821d1c)
---
 lang/rust/avro/src/encode.rs |  2 +-
 lang/rust/avro/src/schema.rs |  5 +++-
 lang/rust/avro/src/writer.rs | 65 ++++++++++++++++++++++++++++----------------
 3 files changed, 46 insertions(+), 26 deletions(-)

diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 95d660d..c4c0bd3 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -47,7 +47,7 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
     zig_i32(i, buffer)
 }
 
-fn encode_internal(
+pub(crate) fn encode_internal(
     value: &Value,
     schema: &Schema,
     names: &NamesRef,
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 815aea6..9c0750b 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -355,7 +355,10 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
 }
 
 impl<'s> ResolvedSchema<'s> {
-    pub fn get_names(&self) -> &NamesRef<'s> {
+    pub(crate) fn get_root_schema(&self) -> &'s Schema {
+        self.root_schema
+    }
+    pub(crate) fn get_names(&self) -> &NamesRef<'s> {
         &self.names_ref
     }
 
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index e257916..23d53ee 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -17,15 +17,15 @@
 
 //! Logic handling writing in Avro format at user level.
 use crate::{
-    encode::{encode, encode_to_vec},
-    schema::Schema,
+    encode::{encode, encode_internal, encode_to_vec},
+    schema::{ResolvedSchema, Schema},
     ser::Serializer,
     types::Value,
     AvroResult, Codec, Error,
 };
 use rand::random;
 use serde::Serialize;
-use std::{collections::HashMap, io::Write};
+use std::{collections::HashMap, convert::TryFrom, io::Write};
 
 const DEFAULT_BLOCK_SIZE: usize = 16000;
 const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
@@ -35,6 +35,8 @@ const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
 pub struct Writer<'a, W> {
     schema: &'a Schema,
     writer: W,
+    #[builder(default, setter(skip))]
+    resolved_schema: Option<ResolvedSchema<'a>>,
     #[builder(default = Codec::Null)]
     codec: Codec,
     #[builder(default = DEFAULT_BLOCK_SIZE)]
@@ -58,17 +60,21 @@ impl<'a, W: Write> Writer<'a, W> {
     /// to.
     /// No compression `Codec` will be used.
     pub fn new(schema: &'a Schema, writer: W) -> Self {
-        Self::builder().schema(schema).writer(writer).build()
+        let mut w = Self::builder().schema(schema).writer(writer).build();
+        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+        w
     }
 
     /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
     /// `io::Write` trait to write to.
     pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
-        Self::builder()
+        let mut w = Self::builder()
             .schema(schema)
             .writer(writer)
             .codec(codec)
-            .build()
+            .build();
+        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+        w
     }
 
     /// Get a reference to the `Schema` associated to a `Writer`.
@@ -88,15 +94,7 @@ impl<'a, W: Write> Writer<'a, W> {
         let n = self.maybe_write_header()?;
 
         let avro = value.into();
-        write_value_ref(self.schema, &avro, &mut self.buffer)?;
-
-        self.num_values += 1;
-
-        if self.buffer.len() >= self.block_size {
-            return self.flush().map(|b| b + n);
-        }
-
-        Ok(n)
+        self.append_value_ref(&avro).map(|m| m + n)
     }
 
     /// Append a compatible value to a `Writer`, also performing schema validation.
@@ -109,15 +107,24 @@ impl<'a, W: Write> Writer<'a, W> {
     pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
         let n = self.maybe_write_header()?;
 
-        write_value_ref(self.schema, value, &mut self.buffer)?;
+        // Lazy init for users using the builder pattern with error throwing
+        match self.resolved_schema {
+            Some(ref rs) => {
+                write_value_ref_resolved(rs, value, &mut self.buffer)?;
+                self.num_values += 1;
 
-        self.num_values += 1;
+                if self.buffer.len() >= self.block_size {
+                    return self.flush().map(|b| b + n);
+                }
 
-        if self.buffer.len() >= self.block_size {
-            return self.flush().map(|b| b + n);
+                Ok(n)
+            }
+            None => {
+                let rs = ResolvedSchema::try_from(self.schema)?;
+                self.resolved_schema = Some(rs);
+                self.append_value_ref(value)
+            }
         }
-
-        Ok(n)
     }
 
     /// Append anything implementing the `Serialize` trait to a `Writer` for
@@ -345,11 +352,21 @@ fn write_avro_datum<T: Into<Value>>(
     Ok(())
 }
 
-fn write_value_ref(schema: &Schema, value: &Value, buffer: &mut Vec<u8>) -> AvroResult<()> {
-    if !value.validate(schema) {
+fn write_value_ref_resolved(
+    resolved_schema: &ResolvedSchema,
+    value: &Value,
+    buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+    if !value.validate(resolved_schema.get_root_schema()) {
         return Err(Error::Validation);
     }
-    encode(value, schema, buffer)?;
+    encode_internal(
+        value,
+        resolved_schema.get_root_schema(),
+        resolved_schema.get_names(),
+        &None,
+        buffer,
+    )?;
     Ok(())
 }