You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/12/08 12:06:10 UTC

[arrow-rs] branch master updated: Fix ipc schema custom_metadata serialization (#3282)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b717139d Fix ipc schema custom_metadata serialization (#3282)
7b717139d is described below

commit 7b717139d52010e6754e21eb248f036ea9c4361e
Author: Jeffrey <22...@users.noreply.github.com>
AuthorDate: Thu Dec 8 23:06:04 2022 +1100

    Fix ipc schema custom_metadata serialization (#3282)
    
    * Fix ipc schema custom_metadata serialization
    
    * Fix ipc doc test
    
    * PR comments
---
 arrow-ipc/src/convert.rs | 95 +++++++++++++++++++++++++++++-------------------
 arrow-ipc/src/writer.rs  | 14 +++----
 2 files changed, 64 insertions(+), 45 deletions(-)

diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index e11d64a47..e5522303d 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -41,29 +41,37 @@ pub fn schema_to_fb_offset<'a>(
     fbb: &mut FlatBufferBuilder<'a>,
     schema: &Schema,
 ) -> WIPOffset<crate::Schema<'a>> {
-    let mut fields = vec![];
-    for field in schema.fields() {
-        let fb_field = build_field(fbb, field);
-        fields.push(fb_field);
-    }
-
-    let mut custom_metadata = vec![];
-    for (k, v) in schema.metadata() {
-        let fb_key_name = fbb.create_string(k.as_str());
-        let fb_val_name = fbb.create_string(v.as_str());
+    let fields = schema
+        .fields()
+        .iter()
+        .map(|field| build_field(fbb, field))
+        .collect::<Vec<_>>();
+    let fb_field_list = fbb.create_vector(&fields);
 
-        let mut kv_builder = crate::KeyValueBuilder::new(fbb);
-        kv_builder.add_key(fb_key_name);
-        kv_builder.add_value(fb_val_name);
-        custom_metadata.push(kv_builder.finish());
-    }
+    let fb_metadata_list = if !schema.metadata().is_empty() {
+        let custom_metadata = schema
+            .metadata()
+            .iter()
+            .map(|(k, v)| {
+                let fb_key_name = fbb.create_string(k);
+                let fb_val_name = fbb.create_string(v);
 
-    let fb_field_list = fbb.create_vector(&fields);
-    let fb_metadata_list = fbb.create_vector(&custom_metadata);
+                let mut kv_builder = crate::KeyValueBuilder::new(fbb);
+                kv_builder.add_key(fb_key_name);
+                kv_builder.add_value(fb_val_name);
+                kv_builder.finish()
+            })
+            .collect::<Vec<_>>();
+        Some(fbb.create_vector(&custom_metadata))
+    } else {
+        None
+    };
 
     let mut builder = crate::SchemaBuilder::new(fbb);
     builder.add_fields(fb_field_list);
-    builder.add_custom_metadata(fb_metadata_list);
+    if let Some(fb_metadata_list) = fb_metadata_list {
+        builder.add_custom_metadata(fb_metadata_list);
+    }
     builder.finish()
 }
 
@@ -1031,32 +1039,45 @@ mod tests {
 
     #[test]
     fn schema_from_bytes() {
-        // bytes of a schema generated from python (0.14.0), saved as an `crate::Message`.
-        // the schema is: Field("field1", DataType::UInt32, false)
+        // Bytes of a schema generated via following python code, using pyarrow 10.0.1:
+        //
+        // import pyarrow as pa
+        // schema = pa.schema([pa.field('field1', pa.uint32(), nullable=False)])
+        // sink = pa.BufferOutputStream()
+        // with pa.ipc.new_stream(sink, schema) as writer:
+        //     pass
+        // # stripping continuation & length prefix & suffix bytes to get only schema bytes
+        // [x for x in sink.getvalue().to_pybytes()][8:-8]
         let bytes: Vec<u8> = vec![
-            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
+            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0,
             12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20,
             0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0,
-            0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0,
-            4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
-            0, 0, 0, 0,
+            0, 0, 2, 16, 0, 0, 0, 32, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102,
+            105, 101, 108, 100, 49, 0, 0, 0, 0, 6, 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0,
+            0,
         ];
-        let ipc = crate::root_as_message(&bytes[..]).unwrap();
+        let ipc = crate::root_as_message(&bytes).unwrap();
         let schema = ipc.header_as_schema().unwrap();
 
-        // a message generated from Rust, same as the Python one
-        let bytes: Vec<u8> = vec![
-            16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0,
-            0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0,
-            8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0,
-            11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0,
-            8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
-            0, 0,
-        ];
-        let ipc2 = crate::root_as_message(&bytes[..]).unwrap();
-        let schema2 = ipc.header_as_schema().unwrap();
+        // generate same message with Rust
+        let data_gen = crate::writer::IpcDataGenerator::default();
+        let arrow_schema =
+            Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
+        let bytes = data_gen
+            .schema_to_bytes(&arrow_schema, &crate::writer::IpcWriteOptions::default())
+            .ipc_message;
+
+        let ipc2 = crate::root_as_message(&bytes).unwrap();
+        let schema2 = ipc2.header_as_schema().unwrap();
+
+        // can't compare schema directly as it compares the underlying bytes, which can differ
+        assert!(schema.custom_metadata().is_none());
+        assert!(schema2.custom_metadata().is_none());
+        assert_eq!(schema.endianness(), schema2.endianness());
+        assert!(schema.features().is_none());
+        assert!(schema2.features().is_none());
+        assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));
 
-        assert_eq!(schema, schema2);
         assert_eq!(ipc.version(), ipc2.version());
         assert_eq!(ipc.header_type(), ipc2.header_type());
         assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 5f188fe1a..c407cd12c 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -793,15 +793,13 @@ impl<W: Write> StreamWriter<W> {
     /// # fn main() -> Result<(), ArrowError> {
     /// // The result we expect from an empty schema
     /// let expected = vec![
-    ///     255, 255, 255, 255,  64,   0,   0,   0,
+    ///     255, 255, 255, 255,  48,   0,   0,   0,
     ///      16,   0,   0,   0,   0,   0,  10,   0,
-    ///      14,   0,  12,   0,  11,   0,   4,   0,
-    ///      10,   0,   0,   0,  20,   0,   0,   0,
-    ///       0,   0,   0,   1,   4,   0,  10,   0,
-    ///      12,   0,   0,   0,   8,   0,   4,   0,
-    ///      10,   0,   0,   0,   8,   0,   0,   0,
-    ///       8,   0,   0,   0,   0,   0,   0,   0,
-    ///       0,   0,   0,   0,   0,   0,   0,   0,
+    ///      12,   0,  10,   0,   9,   0,   4,   0,
+    ///      10,   0,   0,   0,  16,   0,   0,   0,
+    ///       0,   1,   4,   0,   8,   0,   8,   0,
+    ///       0,   0,   4,   0,   8,   0,   0,   0,
+    ///       4,   0,   0,   0,   0,   0,   0,   0,
     ///     255, 255, 255, 255,   0,   0,   0,   0
     /// ];
     ///