You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/16 19:08:04 UTC

[avro] 02/02: AVRO-3405 Add API to read/write user metadata in .avro files

This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch avro-3405-metadata-in-file
in repository https://gitbox.apache.org/repos/asf/avro.git

commit dff38f4256ff4afa9a0d12c0d8f8a428d01eff39
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Wed Feb 16 10:54:24 2022 +0200

    AVRO-3405 Add API to read/write user metadata in .avro files
    
    Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
 lang/rust/src/error.rs  |  3 +++
 lang/rust/src/reader.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
 lang/rust/src/writer.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 136 insertions(+)

diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index d687eea..14a0672 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -376,6 +376,9 @@ pub enum Error {
     /// Error while resolving Schema::Ref
     #[error("Unresolved schema reference: {0}")]
     SchemaResolutionError(String),
+
+    #[error("The file metadata is already flushed.")]
+    FileHeaderAlreadyWritten,
 }
 
 impl serde::ser::Error for Error {
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index d46b3bd..76ded2e 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -19,6 +19,7 @@
 use crate::{decode::decode, schema::Schema, types::Value, util, AvroResult, Codec, Error};
 use serde_json::from_slice;
 use std::{
+    collections::HashMap,
     io::{ErrorKind, Read},
     str::FromStr,
 };
@@ -35,6 +36,7 @@ struct Block<R> {
     marker: [u8; 16],
     codec: Codec,
     writer_schema: Schema,
+    metadata: HashMap<String, String>,
 }
 
 impl<R: Read> Block<R> {
@@ -47,6 +49,7 @@ impl<R: Read> Block<R> {
             buf_idx: 0,
             message_count: 0,
             marker: [0; 16],
+            metadata: HashMap::default(),
         };
 
         block.read_header()?;
@@ -94,6 +97,38 @@ impl<R: Read> Block<R> {
             {
                 self.codec = codec;
             }
+
+            if let Some(metadata) =
+                meta.get("avro.user_metadata")
+                    .and_then(|metadata| match *metadata {
+                        Value::Bytes(ref bytes) => {
+                            match decode(
+                                &Schema::Map(Box::new(Schema::String)),
+                                &mut bytes.as_slice(),
+                            ) {
+                                Ok(Value::Map(ref map)) => Some(map.clone()),
+                                _ => {
+                                    warn!("Failed to parse user metadata");
+                                    None
+                                }
+                            }
+                        }
+                        _ => None,
+                    })
+            {
+                self.metadata = metadata
+                    .iter()
+                    .map(|(k, v)| match v {
+                        Value::String(s) => (k.clone(), s.clone()),
+                        unknown => {
+                            warn!("User metadata values must be strings, found {:?}", unknown);
+                            (k.clone(), format!("{:?}", unknown))
+                        }
+                    })
+                    .collect();
+            } else {
+                debug!("No user metadata found in the file.");
+            }
         } else {
             return Err(Error::GetHeaderMetadata);
         }
@@ -251,6 +286,10 @@ impl<'a, R: Read> Reader<'a, R> {
         self.reader_schema
     }
 
+    pub fn user_metadata(&self) -> &HashMap<String, String> {
+        &self.block.metadata
+    }
+
     #[inline]
     fn read_next(&mut self) -> AvroResult<Option<Value>> {
         let read_schema = if self.should_resolve_schema {
@@ -499,4 +538,32 @@ mod tests {
             assert!(value.is_err());
         }
     }
+
+    #[test]
+    fn test_avro_3405_read_user_metadata_success() {
+        use crate::writer::Writer;
+
+        let schema = Schema::parse_str(SCHEMA).unwrap();
+        let mut writer = Writer::new(&schema, Vec::new());
+
+        let mut user_meta_data = HashMap::new();
+        user_meta_data.insert("key1".to_string(), "value1".to_string());
+        user_meta_data.insert("key2".to_string(), "value2".to_string());
+
+        for (k, v) in user_meta_data.iter() {
+            writer.add_meta_data(k.to_string(), v.to_string()).unwrap();
+        }
+
+        let mut record = Record::new(&schema).unwrap();
+        record.put("a", 27i64);
+        record.put("b", "foo");
+
+        writer.append(record.clone()).unwrap();
+        writer.append(record.clone()).unwrap();
+        writer.flush().unwrap();
+        let result = writer.into_inner().unwrap();
+
+        let reader = Reader::new(&result[..]).unwrap();
+        assert_eq!(reader.user_metadata(), &user_meta_data);
+    }
 }
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index ca08fe0..fab725f 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -49,6 +49,8 @@ pub struct Writer<'a, W> {
     marker: Vec<u8>,
     #[builder(default = false, setter(skip))]
     has_header: bool,
+    #[builder(default, setter(skip))]
+    metadata: HashMap<String, Value>,
 }
 
 impl<'a, W: Write> Writer<'a, W> {
@@ -272,6 +274,17 @@ impl<'a, W: Write> Writer<'a, W> {
         self.writer.write(bytes).map_err(Error::WriteBytes)
     }
 
+    /// Adds custom metadata to the file.
+    /// This method could be used only before adding the first record to the writer.
+    pub fn add_meta_data(&mut self, key: String, value: String) -> AvroResult<()> {
+        if !self.has_header {
+            self.metadata.insert(key, Value::String(value));
+            Ok(())
+        } else {
+            Err(Error::FileHeaderAlreadyWritten)
+        }
+    }
+
     /// Create an Avro header based on schema, codec and sync marker.
     fn header(&self) -> Result<Vec<u8>, Error> {
         let schema_bytes = serde_json::to_string(self.schema)
@@ -282,6 +295,16 @@ impl<'a, W: Write> Writer<'a, W> {
         metadata.insert("avro.schema", Value::Bytes(schema_bytes));
         metadata.insert("avro.codec", self.codec.into());
 
+        if !self.metadata.is_empty() {
+            let mut buf = Vec::new();
+            encode(
+                &Value::Map(self.metadata.clone()),
+                &Schema::Map(Box::new(Schema::String)),
+                &mut buf,
+            );
+            metadata.insert("avro.user_metadata", Value::Bytes(buf));
+        }
+
         let mut header = Vec::new();
         header.extend_from_slice(AVRO_OBJECT_HEADER);
         encode(
@@ -293,6 +316,7 @@ impl<'a, W: Write> Writer<'a, W> {
 
         Ok(header)
     }
+
     fn maybe_write_header(&mut self) -> AvroResult<usize> {
         if !self.has_header {
             let header = self.header()?;
@@ -808,4 +832,46 @@ mod tests {
             data.as_slice()
         );
     }
+
+    #[test]
+    fn test_avro_3405_writer_add_metadata_success() {
+        let schema = Schema::parse_str(SCHEMA).unwrap();
+        let mut writer = Writer::new(&schema, Vec::new());
+
+        writer
+            .add_meta_data("key1".to_string(), "value1".to_string())
+            .unwrap();
+        writer
+            .add_meta_data("key2".to_string(), "value2".to_string())
+            .unwrap();
+
+        let mut record = Record::new(&schema).unwrap();
+        record.put("a", 27i64);
+        record.put("b", "foo");
+
+        writer.append(record.clone()).unwrap();
+        writer.append(record.clone()).unwrap();
+        writer.flush().unwrap();
+        let result = writer.into_inner().unwrap();
+
+        assert_eq!(result.len(), 237);
+    }
+
+    #[test]
+    fn test_avro_3405_writer_add_metadata_failure() {
+        let schema = Schema::parse_str(SCHEMA).unwrap();
+        let mut writer = Writer::new(&schema, Vec::new());
+
+        let mut record = Record::new(&schema).unwrap();
+        record.put("a", 27i64);
+        record.put("b", "foo");
+        writer.append(record.clone()).unwrap();
+
+        if writer
+            .add_meta_data("stringKey".to_string(), "value2".into())
+            .is_ok()
+        {
+            panic!("Expected error that metadata cannot be added after adding data");
+        }
+    }
 }