You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by mg...@apache.org on 2022/02/16 19:08:04 UTC
[avro] 02/02: AVRO-3405 Add API to read/write user metadata in .avro files
This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch avro-3405-metadata-in-file
in repository https://gitbox.apache.org/repos/asf/avro.git
commit dff38f4256ff4afa9a0d12c0d8f8a428d01eff39
Author: Martin Tzvetanov Grigorov <mg...@apache.org>
AuthorDate: Wed Feb 16 10:54:24 2022 +0200
AVRO-3405 Add API to read/write user metadata in .avro files
Signed-off-by: Martin Tzvetanov Grigorov <mg...@apache.org>
---
lang/rust/src/error.rs | 3 +++
lang/rust/src/reader.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++++++
lang/rust/src/writer.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 136 insertions(+)
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index d687eea..14a0672 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -376,6 +376,9 @@ pub enum Error {
/// Error while resolving Schema::Ref
#[error("Unresolved schema reference: {0}")]
SchemaResolutionError(String),
+
+ #[error("The file metadata is already flushed.")]
+ FileHeaderAlreadyWritten,
}
impl serde::ser::Error for Error {
diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs
index d46b3bd..76ded2e 100644
--- a/lang/rust/src/reader.rs
+++ b/lang/rust/src/reader.rs
@@ -19,6 +19,7 @@
use crate::{decode::decode, schema::Schema, types::Value, util, AvroResult, Codec, Error};
use serde_json::from_slice;
use std::{
+ collections::HashMap,
io::{ErrorKind, Read},
str::FromStr,
};
@@ -35,6 +36,7 @@ struct Block<R> {
marker: [u8; 16],
codec: Codec,
writer_schema: Schema,
+ metadata: HashMap<String, String>,
}
impl<R: Read> Block<R> {
@@ -47,6 +49,7 @@ impl<R: Read> Block<R> {
buf_idx: 0,
message_count: 0,
marker: [0; 16],
+ metadata: HashMap::default(),
};
block.read_header()?;
@@ -94,6 +97,38 @@ impl<R: Read> Block<R> {
{
self.codec = codec;
}
+
+ if let Some(metadata) =
+ meta.get("avro.user_metadata")
+ .and_then(|metadata| match *metadata {
+ Value::Bytes(ref bytes) => {
+ match decode(
+ &Schema::Map(Box::new(Schema::String)),
+ &mut bytes.as_slice(),
+ ) {
+ Ok(Value::Map(ref map)) => Some(map.clone()),
+ _ => {
+ warn!("Failed to parse user metadata");
+ None
+ }
+ }
+ }
+ _ => None,
+ })
+ {
+ self.metadata = metadata
+ .iter()
+ .map(|(k, v)| match v {
+ Value::String(s) => (k.clone(), s.clone()),
+ unknown => {
+ warn!("User metadata values must be strings, found {:?}", unknown);
+ (k.clone(), format!("{:?}", unknown))
+ }
+ })
+ .collect();
+ } else {
+ debug!("No user metadata found in the file.");
+ }
} else {
return Err(Error::GetHeaderMetadata);
}
@@ -251,6 +286,10 @@ impl<'a, R: Read> Reader<'a, R> {
self.reader_schema
}
+ pub fn user_metadata(&self) -> &HashMap<String, String> {
+ &self.block.metadata
+ }
+
#[inline]
fn read_next(&mut self) -> AvroResult<Option<Value>> {
let read_schema = if self.should_resolve_schema {
@@ -499,4 +538,32 @@ mod tests {
assert!(value.is_err());
}
}
+
+ #[test]
+ fn test_avro_3405_read_user_metadata_success() {
+ use crate::writer::Writer;
+
+ let schema = Schema::parse_str(SCHEMA).unwrap();
+ let mut writer = Writer::new(&schema, Vec::new());
+
+ let mut user_meta_data = HashMap::new();
+ user_meta_data.insert("key1".to_string(), "value1".to_string());
+ user_meta_data.insert("key2".to_string(), "value2".to_string());
+
+ for (k, v) in user_meta_data.iter() {
+ writer.add_meta_data(k.to_string(), v.to_string()).unwrap();
+ }
+
+ let mut record = Record::new(&schema).unwrap();
+ record.put("a", 27i64);
+ record.put("b", "foo");
+
+ writer.append(record.clone()).unwrap();
+ writer.append(record.clone()).unwrap();
+ writer.flush().unwrap();
+ let result = writer.into_inner().unwrap();
+
+ let reader = Reader::new(&result[..]).unwrap();
+ assert_eq!(reader.user_metadata(), &user_meta_data);
+ }
}
diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs
index ca08fe0..fab725f 100644
--- a/lang/rust/src/writer.rs
+++ b/lang/rust/src/writer.rs
@@ -49,6 +49,8 @@ pub struct Writer<'a, W> {
marker: Vec<u8>,
#[builder(default = false, setter(skip))]
has_header: bool,
+ #[builder(default, setter(skip))]
+ metadata: HashMap<String, Value>,
}
impl<'a, W: Write> Writer<'a, W> {
@@ -272,6 +274,17 @@ impl<'a, W: Write> Writer<'a, W> {
self.writer.write(bytes).map_err(Error::WriteBytes)
}
+ /// Adds custom metadata to the file.
+ /// This method could be used only before adding the first record to the writer.
+ pub fn add_meta_data(&mut self, key: String, value: String) -> AvroResult<()> {
+ if !self.has_header {
+ self.metadata.insert(key, Value::String(value));
+ Ok(())
+ } else {
+ Err(Error::FileHeaderAlreadyWritten)
+ }
+ }
+
/// Create an Avro header based on schema, codec and sync marker.
fn header(&self) -> Result<Vec<u8>, Error> {
let schema_bytes = serde_json::to_string(self.schema)
@@ -282,6 +295,16 @@ impl<'a, W: Write> Writer<'a, W> {
metadata.insert("avro.schema", Value::Bytes(schema_bytes));
metadata.insert("avro.codec", self.codec.into());
+ if !self.metadata.is_empty() {
+ let mut buf = Vec::new();
+ encode(
+ &Value::Map(self.metadata.clone()),
+ &Schema::Map(Box::new(Schema::String)),
+ &mut buf,
+ );
+ metadata.insert("avro.user_metadata", Value::Bytes(buf));
+ }
+
let mut header = Vec::new();
header.extend_from_slice(AVRO_OBJECT_HEADER);
encode(
@@ -293,6 +316,7 @@ impl<'a, W: Write> Writer<'a, W> {
Ok(header)
}
+
fn maybe_write_header(&mut self) -> AvroResult<usize> {
if !self.has_header {
let header = self.header()?;
@@ -808,4 +832,46 @@ mod tests {
data.as_slice()
);
}
+
+ #[test]
+ fn test_avro_3405_writer_add_metadata_success() {
+ let schema = Schema::parse_str(SCHEMA).unwrap();
+ let mut writer = Writer::new(&schema, Vec::new());
+
+ writer
+ .add_meta_data("key1".to_string(), "value1".to_string())
+ .unwrap();
+ writer
+ .add_meta_data("key2".to_string(), "value2".to_string())
+ .unwrap();
+
+ let mut record = Record::new(&schema).unwrap();
+ record.put("a", 27i64);
+ record.put("b", "foo");
+
+ writer.append(record.clone()).unwrap();
+ writer.append(record.clone()).unwrap();
+ writer.flush().unwrap();
+ let result = writer.into_inner().unwrap();
+
+ assert_eq!(result.len(), 237);
+ }
+
+ #[test]
+ fn test_avro_3405_writer_add_metadata_failure() {
+ let schema = Schema::parse_str(SCHEMA).unwrap();
+ let mut writer = Writer::new(&schema, Vec::new());
+
+ let mut record = Record::new(&schema).unwrap();
+ record.put("a", 27i64);
+ record.put("b", "foo");
+ writer.append(record.clone()).unwrap();
+
+ if writer
+ .add_meta_data("stringKey".to_string(), "value2".into())
+ .is_ok()
+ {
+ panic!("Expected error that metadata cannot be added after adding data");
+ }
+ }
}