You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/03 11:55:54 UTC

[arrow-rs] branch master updated: Support to read/write customized metadata in ipc files (#4003)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a63a63f8 Support to read/write customized metadata in ipc files (#4003)
5a63a63f8 is described below

commit 5a63a63f8c8a9a46840a072d72786846e7b1bb89
Author: Huxley Hu <fr...@users.noreply.github.com>
AuthorDate: Mon Apr 3 19:55:49 2023 +0800

    Support to read/write customized metadata in ipc files (#4003)
    
    Test Plan: Pass CI
---
 arrow-ipc/src/convert.rs | 52 +++++++++++++++++++++---------------------------
 arrow-ipc/src/reader.rs  | 38 +++++++++++++++++++++++++++++++++++
 arrow-ipc/src/writer.rs  | 12 +++++++++++
 3 files changed, 73 insertions(+), 29 deletions(-)

diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index c7b5559fa..07f716dea 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -24,7 +24,7 @@ use flatbuffers::{
 use std::collections::HashMap;
 use std::sync::Arc;
 
-use crate::{size_prefixed_root_as_message, CONTINUATION_MARKER};
+use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
 use DataType::*;
 
 /// Serialize a schema in IPC format
@@ -38,6 +38,25 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
     fbb
 }
 
+pub fn metadata_to_fb<'a>(
+    fbb: &mut FlatBufferBuilder<'a>,
+    metadata: &HashMap<String, String>,
+) -> WIPOffset<Vector<'a, ForwardsUOffset<KeyValue<'a>>>> {
+    let custom_metadata = metadata
+        .iter()
+        .map(|(k, v)| {
+            let fb_key_name = fbb.create_string(k);
+            let fb_val_name = fbb.create_string(v);
+
+            let mut kv_builder = crate::KeyValueBuilder::new(fbb);
+            kv_builder.add_key(fb_key_name);
+            kv_builder.add_value(fb_val_name);
+            kv_builder.finish()
+        })
+        .collect::<Vec<_>>();
+    fbb.create_vector(&custom_metadata)
+}
+
 pub fn schema_to_fb_offset<'a>(
     fbb: &mut FlatBufferBuilder<'a>,
     schema: &Schema,
@@ -49,24 +68,8 @@ pub fn schema_to_fb_offset<'a>(
         .collect::<Vec<_>>();
     let fb_field_list = fbb.create_vector(&fields);
 
-    let fb_metadata_list = if !schema.metadata().is_empty() {
-        let custom_metadata = schema
-            .metadata()
-            .iter()
-            .map(|(k, v)| {
-                let fb_key_name = fbb.create_string(k);
-                let fb_val_name = fbb.create_string(v);
-
-                let mut kv_builder = crate::KeyValueBuilder::new(fbb);
-                kv_builder.add_key(fb_key_name);
-                kv_builder.add_value(fb_val_name);
-                kv_builder.finish()
-            })
-            .collect::<Vec<_>>();
-        Some(fbb.create_vector(&custom_metadata))
-    } else {
-        None
-    };
+    let fb_metadata_list =
+        (!schema.metadata().is_empty()).then(|| metadata_to_fb(fbb, schema.metadata()));
 
     let mut builder = crate::SchemaBuilder::new(fbb);
     builder.add_fields(fb_field_list);
@@ -440,16 +443,7 @@ pub(crate) fn build_field<'a>(
     // Optional custom metadata.
     let mut fb_metadata = None;
     if !field.metadata().is_empty() {
-        let mut kv_vec = vec![];
-        for (k, v) in field.metadata() {
-            let kv_args = crate::KeyValueArgs {
-                key: Some(fbb.create_string(k.as_str())),
-                value: Some(fbb.create_string(v.as_str())),
-            };
-            let kv_offset = crate::KeyValue::create(fbb, &kv_args);
-            kv_vec.push(kv_offset);
-        }
-        fb_metadata = Some(fbb.create_vector(&kv_vec));
+        fb_metadata = Some(metadata_to_fb(fbb, field.metadata()));
     };
 
     let fb_field_name = fbb.create_string(field.name().as_str());
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 75d078456..60633487a 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -792,6 +792,9 @@ pub struct FileReader<R: Read + Seek> {
     /// Metadata version
     metadata_version: crate::MetadataVersion,
 
+    /// User defined metadata
+    custom_metadata: HashMap<String, String>,
+
     /// Optional projection and projected_schema
     projection: Option<(Vec<usize>, Schema)>,
 }
@@ -862,6 +865,16 @@ impl<R: Read + Seek> FileReader<R> {
         let ipc_schema = footer.schema().unwrap();
         let schema = crate::convert::fb_to_schema(ipc_schema);
 
+        let mut custom_metadata = HashMap::new();
+        if let Some(fb_custom_metadata) = footer.custom_metadata() {
+            for kv in fb_custom_metadata.into_iter() {
+                custom_metadata.insert(
+                    kv.key().unwrap().to_string(),
+                    kv.value().unwrap().to_string(),
+                );
+            }
+        }
+
         // Create an array of optional dictionary value arrays, one per field.
         let mut dictionaries_by_id = HashMap::new();
         if let Some(dictionaries) = footer.dictionaries() {
@@ -926,10 +939,16 @@ impl<R: Read + Seek> FileReader<R> {
             total_blocks,
             dictionaries_by_id,
             metadata_version: footer.version(),
+            custom_metadata,
             projection,
         })
     }
 
+    /// Return user defined customized metadata
+    pub fn custom_metadata(&self) -> &HashMap<String, String> {
+        &self.custom_metadata
+    }
+
     /// Return the number of batches in the file
     pub fn num_batches(&self) -> usize {
         self.total_blocks
@@ -1522,6 +1541,25 @@ mod tests {
         reader.next().unwrap().unwrap()
     }
 
+    #[test]
+    fn test_roundtrip_with_custom_metadata() {
+        let schema = Schema::new(vec![Field::new("dummy", DataType::Float64, false)]);
+        let mut buf = Vec::new();
+        let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
+        let mut test_metadata = HashMap::new();
+        test_metadata.insert("abc".to_string(), "abc".to_string());
+        test_metadata.insert("def".to_string(), "def".to_string());
+        for (k, v) in &test_metadata {
+            writer.write_metadata(k, v);
+        }
+        writer.finish().unwrap();
+        drop(writer);
+
+        let reader =
+            crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+        assert_eq!(reader.custom_metadata(), &test_metadata);
+    }
+
     #[test]
     fn test_roundtrip_nested_dict() {
         let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 12c173f64..7d29f048a 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -706,6 +706,8 @@ pub struct FileWriter<W: Write> {
     finished: bool,
     /// Keeps track of dictionaries that have been written
     dictionary_tracker: DictionaryTracker,
+    /// User level customized metadata
+    custom_metadata: HashMap<String, String>,
 
     data_gen: IpcDataGenerator,
 }
@@ -742,10 +744,15 @@ impl<W: Write> FileWriter<W> {
             record_blocks: vec![],
             finished: false,
             dictionary_tracker: DictionaryTracker::new(true),
+            custom_metadata: HashMap::new(),
             data_gen,
         })
     }
 
+    pub fn write_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
+        self.custom_metadata.insert(key.into(), value.into());
+    }
+
     /// Write a record batch to the file
     pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         if self.finished {
@@ -798,6 +805,8 @@ impl<W: Write> FileWriter<W> {
         let dictionaries = fbb.create_vector(&self.dictionary_blocks);
         let record_batches = fbb.create_vector(&self.record_blocks);
         let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
+        let fb_custom_metadata = (!self.custom_metadata.is_empty())
+            .then(|| crate::convert::metadata_to_fb(&mut fbb, &self.custom_metadata));
 
         let root = {
             let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
@@ -805,6 +814,9 @@ impl<W: Write> FileWriter<W> {
             footer_builder.add_schema(schema);
             footer_builder.add_dictionaries(dictionaries);
             footer_builder.add_recordBatches(record_batches);
+            if let Some(fb_custom_metadata) = fb_custom_metadata {
+                footer_builder.add_custom_metadata(fb_custom_metadata);
+            }
             footer_builder.finish()
         };
         fbb.finish(root, None);