You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/03 11:55:54 UTC
[arrow-rs] branch master updated: Support to read/write customized metadata in ipc files (#4003)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5a63a63f8 Support to read/write customized metadata in ipc files (#4003)
5a63a63f8 is described below
commit 5a63a63f8c8a9a46840a072d72786846e7b1bb89
Author: Huxley Hu <fr...@users.noreply.github.com>
AuthorDate: Mon Apr 3 19:55:49 2023 +0800
Support to read/write customized metadata in ipc files (#4003)
Test Plan: Pass CI
---
arrow-ipc/src/convert.rs | 52 +++++++++++++++++++++---------------------------
arrow-ipc/src/reader.rs | 38 +++++++++++++++++++++++++++++++++++
arrow-ipc/src/writer.rs | 12 +++++++++++
3 files changed, 73 insertions(+), 29 deletions(-)
diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index c7b5559fa..07f716dea 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -24,7 +24,7 @@ use flatbuffers::{
use std::collections::HashMap;
use std::sync::Arc;
-use crate::{size_prefixed_root_as_message, CONTINUATION_MARKER};
+use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
use DataType::*;
/// Serialize a schema in IPC format
@@ -38,6 +38,25 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
fbb
}
+pub fn metadata_to_fb<'a>(
+ fbb: &mut FlatBufferBuilder<'a>,
+ metadata: &HashMap<String, String>,
+) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
+ let custom_metadata = metadata
+ .iter()
+ .map(|(k, v)| {
+ let fb_key_name = fbb.create_string(k);
+ let fb_val_name = fbb.create_string(v);
+
+ let mut kv_builder = crate::KeyValueBuilder::new(fbb);
+ kv_builder.add_key(fb_key_name);
+ kv_builder.add_value(fb_val_name);
+ kv_builder.finish()
+ })
+ .collect::<Vec<_>>();
+ fbb.create_vector(&custom_metadata)
+}
+
pub fn schema_to_fb_offset<'a>(
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
@@ -49,24 +68,8 @@ pub fn schema_to_fb_offset<'a>(
.collect::<Vec<_>>();
let fb_field_list = fbb.create_vector(&fields);
- let fb_metadata_list = if !schema.metadata().is_empty() {
- let custom_metadata = schema
- .metadata()
- .iter()
- .map(|(k, v)| {
- let fb_key_name = fbb.create_string(k);
- let fb_val_name = fbb.create_string(v);
-
- let mut kv_builder = crate::KeyValueBuilder::new(fbb);
- kv_builder.add_key(fb_key_name);
- kv_builder.add_value(fb_val_name);
- kv_builder.finish()
- })
- .collect::<Vec<_>>();
- Some(fbb.create_vector(&custom_metadata))
- } else {
- None
- };
+ let fb_metadata_list =
+ (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
let mut builder = crate::SchemaBuilder::new(fbb);
builder.add_fields(fb_field_list);
@@ -440,16 +443,7 @@ pub(crate) fn build_field<'a>(
// Optional custom metadata.
let mut fb_metadata = None;
if !field.metadata().is_empty() {
- let mut kv_vec = vec![];
- for (k, v) in field.metadata() {
- let kv_args = crate::KeyValueArgs {
- key: Some(fbb.create_string(k.as_str())),
- value: Some(fbb.create_string(v.as_str())),
- };
- let kv_offset = crate::KeyValue::create(fbb, &kv_args);
- kv_vec.push(kv_offset);
- }
- fb_metadata = Some(fbb.create_vector(&kv_vec));
+ fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
};
let fb_field_name = fbb.create_string(field.name().as_str());
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 75d078456..60633487a 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -792,6 +792,9 @@ pub struct FileReader<R: Read + Seek> {
/// Metadata version
metadata_version: crate::MetadataVersion,
+ /// User defined metadata
+ custom_metadata: HashMap<String, String>,
+
/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
}
@@ -862,6 +865,16 @@ impl<R: Read + Seek> FileReader<R> {
let ipc_schema = footer.schema().unwrap();
let schema = crate::convert::fb_to_schema(ipc_schema);
+ let mut custom_metadata = HashMap::new();
+ if let Some(fb_custom_metadata) = footer.custom_metadata() {
+ for kv in fb_custom_metadata.into_iter() {
+ custom_metadata.insert(
+ kv.key().unwrap().to_string(),
+ kv.value().unwrap().to_string(),
+ );
+ }
+ }
+
// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_id = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
@@ -926,10 +939,16 @@ impl<R: Read + Seek> FileReader<R> {
total_blocks,
dictionaries_by_id,
metadata_version: footer.version(),
+ custom_metadata,
projection,
})
}
+ /// Return user defined customized metadata
+ pub fn custom_metadata(&self) -> &HashMap<String, String> {
+ &self.custom_metadata
+ }
+
/// Return the number of batches in the file
pub fn num_batches(&self) -> usize {
self.total_blocks
@@ -1522,6 +1541,25 @@ mod tests {
reader.next().unwrap().unwrap()
}
+ #[test]
+ fn test_roundtrip_with_custom_metadata() {
+ let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
+ let mut buf = Vec::new();
+ let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
+ let mut test_metadata = HashMap::new();
+ test_metadata.insert("abc".to_string(), "abc".to_string());
+ test_metadata.insert("def".to_string(), "def".to_string());
+ for (k, v) in &test_metadata {
+ writer.write_metadata(k, v);
+ }
+ writer.finish().unwrap();
+ drop(writer);
+
+ let reader =
+ crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+ assert_eq!(reader.custom_metadata(), &test_metadata);
+ }
+
#[test]
fn test_roundtrip_nested_dict() {
let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 12c173f64..7d29f048a 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -706,6 +706,8 @@ pub struct FileWriter<W: Write> {
finished: bool,
/// Keeps track of dictionaries that have been written
dictionary_tracker: DictionaryTracker,
+ /// User level customized metadata
+ custom_metadata: HashMap<String, String>,
data_gen: IpcDataGenerator,
}
@@ -742,10 +744,15 @@ impl<W: Write> FileWriter<W> {
record_blocks: vec![],
finished: false,
dictionary_tracker: DictionaryTracker::new(true),
+ custom_metadata: HashMap::new(),
data_gen,
})
}
+ pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
+ self.custom_metadata.insert(key.into(), value.into());
+ }
+
/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
@@ -798,6 +805,8 @@ impl<W: Write> FileWriter<W> {
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
+ let fb_custom_metadata = (!self.custom_metadata.is_empty())
+ .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
let root = {
let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
@@ -805,6 +814,9 @@ impl<W: Write> FileWriter<W> {
footer_builder.add_schema(schema);
footer_builder.add_dictionaries(dictionaries);
footer_builder.add_recordBatches(record_batches);
+ if let Some(fb_custom_metadata) = fb_custom_metadata {
+ footer_builder.add_custom_metadata(fb_custom_metadata);
+ }
footer_builder.finish()
};
fbb.finish(root, None);