You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/09/27 12:46:31 UTC
[avro] 01/01: AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch avro-3630-append-to-preexisting-bytes
in repository https://gitbox.apache.org/repos/asf/avro.git
commit fabb82b969d182ef315db60ff26d09efea4bd72b
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Tue Sep 27 15:43:36 2022 +0300
AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes
Make it possible to pass a block marker to Writer, so that it could
append to pre-existing bytes (i.e. bytes created by another Writer)
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/avro/src/lib.rs | 4 +-
lang/rust/avro/src/writer.rs | 42 +++++++++++++--
lang/rust/avro/tests/append_to_existing.rs | 87 ++++++++++++++++++++++++++++++
3 files changed, 128 insertions(+), 5 deletions(-)
diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index 679eb1cf5..9edb3cb31 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -746,7 +746,9 @@ pub use reader::{from_avro_datum, GenericSingleObjectReader, Reader, SpecificSin
pub use schema::{AvroSchema, Schema};
pub use ser::to_value;
pub use util::max_allocation_bytes;
-pub use writer::{to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer};
+pub use writer::{
+ read_marker, to_avro_datum, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer,
+};
#[cfg(feature = "derive")]
pub use apache_avro_derive::*;
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 98ceafe46..83b58c927 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -47,7 +47,7 @@ pub struct Writer<'a, W> {
serializer: Serializer,
#[builder(default = 0, setter(skip))]
num_values: usize,
- #[builder(default = generate_sync_marker(), setter(skip))]
+ #[builder(default = generate_sync_marker())]
marker: Vec<u8>,
#[builder(default = false, setter(skip))]
has_header: bool,
@@ -60,9 +60,7 @@ impl<'a, W: Write> Writer<'a, W> {
/// to.
/// No compression `Codec` will be used.
pub fn new(schema: &'a Schema, writer: W) -> Self {
- let mut w = Self::builder().schema(schema).writer(writer).build();
- w.resolved_schema = ResolvedSchema::try_from(schema).ok();
- w
+ Writer::with_codec(schema, writer, Codec::Null)
}
/// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
@@ -77,6 +75,33 @@ impl<'a, W: Write> Writer<'a, W> {
w
}
+ /// Creates a `Writer` that will append values to already populated
+ /// `std::io::Write` using the provided `marker`
+ /// No compression `Codec` will be used.
+ pub fn extend_from(schema: &'a Schema, writer: W, marker: Vec<u8>) -> Self {
+ Writer::extend_from_with_codec(schema, writer, Codec::Null, marker)
+ }
+
+ /// Creates a `Writer` that will append values to already populated
+ /// `std::io::Write` using the provided `marker`
+ pub fn extend_from_with_codec(
+ schema: &'a Schema,
+ writer: W,
+ codec: Codec,
+ marker: Vec<u8>,
+ ) -> Self {
+ assert_eq!(marker.len(), 16);
+ let mut w = Self::builder()
+ .schema(schema)
+ .writer(writer)
+ .codec(codec)
+ .marker(marker)
+ .build();
+ w.has_header = true;
+ w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+ w
+ }
+
/// Get a reference to the `Schema` associated to a `Writer`.
pub fn schema(&self) -> &'a Schema {
self.schema
@@ -513,6 +538,15 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve
Ok(buffer)
}
+/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
+pub fn read_marker(bytes: &[u8]) -> Vec<u8> {
+ assert!(
+ bytes.len() > 16,
+ "The bytes are too short to read a marker from them"
+ );
+ bytes[(bytes.len() - 16)..].to_vec()
+}
+
#[cfg(not(target_arch = "wasm32"))]
fn generate_sync_marker() -> Vec<u8> {
std::iter::repeat_with(rand::random).take(16).collect()
diff --git a/lang/rust/avro/tests/append_to_existing.rs b/lang/rust/avro/tests/append_to_existing.rs
new file mode 100644
index 000000000..7d2a31733
--- /dev/null
+++ b/lang/rust/avro/tests/append_to_existing.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::read_marker;
+use apache_avro::{
+ types::{Record, Value},
+ AvroResult, Reader, Schema, Writer,
+};
+
+#[test]
+fn append_to_an_existing_file() {
+ let schema_str = r#"
+ {
+ "type": "record",
+ "name": "append_to_existing_file",
+ "fields": [
+ {"name": "a", "type": "int"}
+ ]
+ }
+ "#;
+
+ let schema = Schema::parse_str(schema_str).expect("Cannot parse the schema");
+
+ let bytes = get_avro_bytes(&schema);
+
+ let marker = read_marker(&bytes);
+
+ let mut writer = Writer::extend_from(&schema, bytes, marker);
+
+ writer
+ .append(create_datum(&schema, 2))
+ .expect("An error occurred while appending more data");
+
+ let new_bytes = writer.into_inner().expect("Cannot get the new bytes");
+
+ let reader = Reader::new(&*new_bytes).expect("Cannot read the new bytes");
+ let mut i = 1;
+ for value in reader {
+ check(value, i);
+ i += 1
+ }
+}
+
+/// Simulates reading from a pre-existing .avro file and returns its bytes
+fn get_avro_bytes(schema: &Schema) -> Vec<u8> {
+ let mut writer = Writer::new(&schema, Vec::new());
+ writer
+ .append(create_datum(&schema, 1))
+ .expect("An error while appending data");
+ let bytes = writer.into_inner().expect("Cannot get the Avro bytes");
+ bytes
+}
+
+/// Creates a new datum to write
+fn create_datum(schema: &Schema, value: i32) -> Record {
+ let mut datum = Record::new(schema).unwrap();
+ datum.put("a", value);
+ datum
+}
+
+/// Checks the read values
+fn check(v: AvroResult<Value>, expected: i32) {
+ match v {
+ Ok(val) => match val {
+ Value::Record(fields) => match &fields[0] {
+ (_, Value::Int(actual)) => assert_eq!(&expected, actual),
+ _ => unreachable!("The field value type must be an Int!"),
+ },
+ _ => unreachable!("The value type must be an Record!"),
+ },
+ Err(e) => eprintln!("Error white reading the data: {:?}", e),
+ }
+}