You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/09/28 08:36:05 UTC

[avro] branch branch-1.11 updated: AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes (#1888)

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/branch-1.11 by this push:
     new ab6a1deab AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes (#1888)
ab6a1deab is described below

commit ab6a1deab85bb41d3649a3fa3a6b4056d2f42dc4
Author: Martin Grigorov <ma...@users.noreply.github.com>
AuthorDate: Wed Sep 28 11:35:34 2022 +0300

    AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes (#1888)
    
    * AVRO-3630: [Rust] Make it possible to extend pre-existing Avro bytes
    
    Make it possible to pass a block marker to Writer, so that it could
    append to pre-existing bytes (i.e. bytes created by another Writer)
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    * AVRO-3630: Change Writer#marker from Vec<u8> to [u8; 16]
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
    (cherry picked from commit 8640436de52ed25ccb0dbfb125cc3f142e968889)
---
 lang/rust/avro/src/lib.rs                  |  4 +-
 lang/rust/avro/src/reader.rs               | 11 ++++
 lang/rust/avro/src/writer.rs               | 50 +++++++++++++----
 lang/rust/avro/tests/append_to_existing.rs | 86 ++++++++++++++++++++++++++++++
 4 files changed, 141 insertions(+), 10 deletions(-)

diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs
index 679eb1cf5..a6b06d74a 100644
--- a/lang/rust/avro/src/lib.rs
+++ b/lang/rust/avro/src/lib.rs
@@ -742,7 +742,9 @@ pub use de::from_value;
 pub use decimal::Decimal;
 pub use duration::{Days, Duration, Millis, Months};
 pub use error::Error;
-pub use reader::{from_avro_datum, GenericSingleObjectReader, Reader, SpecificSingleObjectReader};
+pub use reader::{
+    from_avro_datum, read_marker, GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
+};
 pub use schema::{AvroSchema, Schema};
 pub use ser::to_value;
 pub use util::max_allocation_bytes;
diff --git a/lang/rust/avro/src/reader.rs b/lang/rust/avro/src/reader.rs
index de68c18da..a8238b226 100644
--- a/lang/rust/avro/src/reader.rs
+++ b/lang/rust/avro/src/reader.rs
@@ -455,6 +455,17 @@ where
     }
 }
 
+/// Reads the marker bytes from Avro bytes generated earlier by a `Writer`
+pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
+    assert!(
+        bytes.len() > 16,
+        "The bytes are too short to read a marker from them"
+    );
+    let mut marker = [0_u8; 16];
+    marker.clone_from_slice(&bytes[(bytes.len() - 16)..]);
+    marker
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs
index 98ceafe46..9fafcf968 100644
--- a/lang/rust/avro/src/writer.rs
+++ b/lang/rust/avro/src/writer.rs
@@ -47,8 +47,8 @@ pub struct Writer<'a, W> {
     serializer: Serializer,
     #[builder(default = 0, setter(skip))]
     num_values: usize,
-    #[builder(default = generate_sync_marker(), setter(skip))]
-    marker: Vec<u8>,
+    #[builder(default = generate_sync_marker())]
+    marker: [u8; 16],
     #[builder(default = false, setter(skip))]
     has_header: bool,
     #[builder(default)]
@@ -60,9 +60,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// to.
     /// No compression `Codec` will be used.
     pub fn new(schema: &'a Schema, writer: W) -> Self {
-        let mut w = Self::builder().schema(schema).writer(writer).build();
-        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
-        w
+        Writer::with_codec(schema, writer, Codec::Null)
     }
 
     /// Creates a `Writer` with a specific `Codec` given a `Schema` and something implementing the
@@ -77,6 +75,32 @@ impl<'a, W: Write> Writer<'a, W> {
         w
     }
 
+    /// Creates a `Writer` that will append values to already populated
+    /// `std::io::Write` using the provided `marker`
+    /// No compression `Codec` will be used.
+    pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
+        Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
+    }
+
+    /// Creates a `Writer` that will append values to already populated
+    /// `std::io::Write` using the provided `marker`
+    pub fn append_to_with_codec(
+        schema: &'a Schema,
+        writer: W,
+        codec: Codec,
+        marker: [u8; 16],
+    ) -> Self {
+        let mut w = Self::builder()
+            .schema(schema)
+            .writer(writer)
+            .codec(codec)
+            .marker(marker)
+            .build();
+        w.has_header = true;
+        w.resolved_schema = ResolvedSchema::try_from(schema).ok();
+        w
+    }
+
     /// Get a reference to the `Schema` associated to a `Writer`.
     pub fn schema(&self) -> &'a Schema {
         self.schema
@@ -514,16 +538,24 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve
 }
 
 #[cfg(not(target_arch = "wasm32"))]
-fn generate_sync_marker() -> Vec<u8> {
-    std::iter::repeat_with(rand::random).take(16).collect()
+fn generate_sync_marker() -> [u8; 16] {
+    let mut marker = [0_u8; 16];
+    std::iter::repeat_with(rand::random)
+        .take(16)
+        .enumerate()
+        .for_each(|(i, n)| marker[i] = n);
+    marker
 }
 
 #[cfg(target_arch = "wasm32")]
-fn generate_sync_marker() -> Vec<u8> {
+fn generate_sync_marker() -> [u8; 16] {
+    let mut marker = [0_u8; 16];
     std::iter::repeat_with(quad_rand::rand)
         .take(4)
         .flat_map(|i| i.to_be_bytes())
-        .collect()
+        .enumerate()
+        .for_each(|(i, n)| marker[i] = n);
+    marker
 }
 
 #[cfg(test)]
diff --git a/lang/rust/avro/tests/append_to_existing.rs b/lang/rust/avro/tests/append_to_existing.rs
new file mode 100644
index 000000000..4f92f45e9
--- /dev/null
+++ b/lang/rust/avro/tests/append_to_existing.rs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use apache_avro::{
+    read_marker,
+    types::{Record, Value},
+    AvroResult, Reader, Schema, Writer,
+};
+
+#[test]
+fn avro_3630_append_to_an_existing_file() {
+    let schema_str = r#"
+            {
+                "type": "record",
+                "name": "append_to_existing_file",
+                "fields": [
+                    {"name": "a", "type": "int"}
+                ]
+            }
+        "#;
+
+    let schema = Schema::parse_str(schema_str).expect("Cannot parse the schema");
+
+    let bytes = get_avro_bytes(&schema);
+
+    let marker = read_marker(&bytes[..]);
+
+    let mut writer = Writer::append_to(&schema, bytes, marker);
+
+    writer
+        .append(create_datum(&schema, 2))
+        .expect("An error occurred while appending more data");
+
+    let new_bytes = writer.into_inner().expect("Cannot get the new bytes");
+
+    let reader = Reader::new(&*new_bytes).expect("Cannot read the new bytes");
+    let mut i = 1;
+    for value in reader {
+        check(value, i);
+        i += 1
+    }
+}
+
+/// Simulates reading from a pre-existing .avro file and returns its bytes
+fn get_avro_bytes(schema: &Schema) -> Vec<u8> {
+    let mut writer = Writer::new(schema, Vec::new());
+    writer
+        .append(create_datum(schema, 1))
+        .expect("An error while appending data");
+    writer.into_inner().expect("Cannot get the Avro bytes")
+}
+
+/// Creates a new datum to write
+fn create_datum(schema: &Schema, value: i32) -> Record {
+    let mut datum = Record::new(schema).unwrap();
+    datum.put("a", value);
+    datum
+}
+
+/// Checks the read values
+fn check(value: AvroResult<Value>, expected: i32) {
+    match value {
+        Ok(value) => match value {
+            Value::Record(fields) => match &fields[0] {
+                (_, Value::Int(actual)) => assert_eq!(&expected, actual),
+                _ => panic!("The field value type must be an Int: {:?}!", &fields[0]),
+            },
+            _ => panic!("The value type must be a Record: {:?}!", value),
+        },
+        Err(e) => panic!("Error while reading the data: {:?}", e),
+    }
+}