You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/03/20 20:08:06 UTC
[avro] branch master updated: AVRO-3451: Reuse Resolved Schemas (#1608)
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/master by this push:
new 6682731 AVRO-3451: Reuse Resolved Schemas (#1608)
6682731 is described below
commit 66827318f0a134e8f6d4638a0787cb1b36821d1c
Author: Jack Klamer <jf...@gmail.com>
AuthorDate: Sun Mar 20 15:07:59 2022 -0500
AVRO-3451: Reuse Resolved Schemas (#1608)
* [AVRO-3451] reuse resolved schema
* AVRO-3451: Reuse Resolved Schemas
Remove useless check.
Always set `self.attempted_resolve = true`. There is no harm even if it was true already
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
* [AVRO-3451] remove bool
Co-authored-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/avro/src/encode.rs | 2 +-
lang/rust/avro/src/schema.rs | 5 +++-
lang/rust/avro/src/writer.rs | 65 ++++++++++++++++++++++++++++----------------
3 files changed, 46 insertions(+), 26 deletions(-)
diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs
index 95d660d..c4c0bd3 100644
--- a/lang/rust/avro/src/encode.rs
+++ b/lang/rust/avro/src/encode.rs
@@ -47,7 +47,7 @@ fn encode_int(i: i32, buffer: &mut Vec<u8>) {
zig_i32(i, buffer)
}
-fn encode_internal(
+pub(crate) fn encode_internal(
value: &Value,
schema: &Schema,
names: &NamesRef,
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 815aea6..9c0750b 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -355,7 +355,10 @@ impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> {
}
impl<'s> ResolvedSchema<'s> {
- pub fn get_names(&self) -> &NamesRef<'s> {
+ pub(crate) fn get_root_schema(&self) -> &'s Schema {
+ self.root_schema
+ }
+ pub(crate) fn get_names(&self) -> &NamesRef<'s> {
&self.names_ref
}
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index e257916..23d53ee 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -17,15 +17,15 @@
//! Logic handling writing in Avro format at user level.
use crate::{
- encode::{encode, encode_to_vec},
- schema::Schema,
+ encode::{encode, encode_internal, encode_to_vec},
+ schema::{ResolvedSchema, Schema},
ser::Serializer,
types::Value,
AvroResult, Codec, Error,
};
use rand::random;
use serde::Serialize;
-use std::{collections::HashMap, io::Write};
+use std::{collections::HashMap, convert::TryFrom, io::Write};
const DEFAULT_BLOCK_SIZE: usize = 16000;
const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
@@ -35,6 +35,8 @@ const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
pub struct Writer<'a, W> {
schema: &'a Schema,
writer: W,
+ #[builder(default, setter(skip))]
+ resolved_schema: Option<ResolvedSchema<'a>>,
#[builder(default = Codec::Null)]
codec: Codec,
#[builder(default = DEFAULT_BLOCK_SIZE)]
@@ -58,17 +60,21 @@ impl<'a, W: Write> Writer<'a, W> {
/// to.
/// No compression `Codec` will be used.
pub fn new(schema: &'a Schema, writer: W) -> Self {
- Self::builder().schema(schema).writer(writer).build()
+ let mut w = Self::builder().schema(schema).writer(writer).build();
+ w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+ w
}
/// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
/// `io::Write` trait to write to.
pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
- Self::builder()
+ let mut w = Self::builder()
.schema(schema)
.writer(writer)
.codec(codec)
- .build()
+ .build();
+ w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+ w
}
/// Get a reference to the `Schema` associated to a `Writer`.
@@ -88,15 +94,7 @@ impl<'a, W: Write> Writer<'a, W> {
let n = self.maybe_write_header()?;
let avro = value.into();
- write_value_ref(self.schema, &avro, &mut self.buffer)?;
-
- self.num_values += 1;
-
- if self.buffer.len() >= self.block_size {
- return self.flush().map(|b| b + n);
- }
-
- Ok(n)
+ self.append_value_ref(&avro).map(|m| m + n)
}
/// Append a compatible value to a `Writer`, also performing schema validation.
@@ -109,15 +107,24 @@ impl<'a, W: Write> Writer<'a, W> {
pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
let n = self.maybe_write_header()?;
- write_value_ref(self.schema, value, &mut self.buffer)?;
+ // Lazy init for users using the builder pattern with error throwing
+ match self.resolved_schema {
+ Some(ref rs) => {
+ write_value_ref_resolved(rs, value, &mut self.buffer)?;
+ self.num_values += 1;
- self.num_values += 1;
+ if self.buffer.len() >= self.block_size {
+ return self.flush().map(|b| b + n);
+ }
- if self.buffer.len() >= self.block_size {
- return self.flush().map(|b| b + n);
+ Ok(n)
+ }
+ None => {
+ let rs = ResolvedSchema::try_from(self.schema)?;
+ self.resolved_schema = Some(rs);
+ self.append_value_ref(value)
+ }
}
-
- Ok(n)
}
/// Append anything implementing the `Serialize` trait to a `Writer` for
@@ -345,11 +352,21 @@ fn write_avro_datum<T: Into<Value>>(
Ok(())
}
-fn write_value_ref(schema: &Schema, value: &Value, buffer: &mut Vec<u8>) -> AvroResult<()> {
- if !value.validate(schema) {
+fn write_value_ref_resolved(
+ resolved_schema: &ResolvedSchema,
+ value: &Value,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<()> {
+ if !value.validate(resolved_schema.get_root_schema()) {
return Err(Error::Validation);
}
- encode(value, schema, buffer)?;
+ encode_internal(
+ value,
+ resolved_schema.get_root_schema(),
+ resolved_schema.get_names(),
+ &None,
+ buffer,
+ )?;
Ok(())
}