You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/20 18:59:00 UTC

[arrow-rs] branch master updated: Read/write nested dictionary under map in ipc stream reader/writer (#1583)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d6b396234 Read/write nested dictionary under map in ipc stream reader/writer (#1583)
d6b396234 is described below

commit d6b396234d2b099b8b8aab231c91ac6cbb48fdd7
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Apr 20 11:58:55 2022 -0700

    Read/write nested dictionary under map in ipc stream reader/writer (#1583)
    
    * IPC read/write nested dict in map
    
    * For review comments
    
    * update comment
---
 arrow/src/datatypes/field.rs |  2 +-
 arrow/src/ipc/reader.rs      | 62 ++++++++++++++++++++++++++++++++++++++++++++
 arrow/src/ipc/writer.rs      | 34 +++++++++++++++++++++---
 3 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/arrow/src/datatypes/field.rs b/arrow/src/datatypes/field.rs
index 6fccd2209..7ad790268 100644
--- a/arrow/src/datatypes/field.rs
+++ b/arrow/src/datatypes/field.rs
@@ -131,7 +131,7 @@ impl Field {
             DataType::List(field)
             | DataType::LargeList(field)
             | DataType::FixedSizeList(field, _)
-            | DataType::Map(field, _) => collected_fields.push(field),
+            | DataType::Map(field, _) => collected_fields.extend(field.fields()),
             DataType::Dictionary(_, value_field) => {
                 collected_fields.append(&mut self._fields(value_field.as_ref()))
             }
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 748d9db47..33d608576 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -1444,6 +1444,68 @@ mod tests {
         assert_eq!(input_batch, output_batch);
     }
 
+    #[test]
+    fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
+        let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
+        let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
+        let value_dict_array =
+            DictionaryArray::<Int8Type>::try_new(&value_dict_keys, &values).unwrap();
+
+        let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
+        let key_dict_array =
+            DictionaryArray::<Int8Type>::try_new(&key_dict_keys, &values).unwrap();
+
+        let keys_field = Field::new_dict(
+            "keys",
+            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+            true,
+            1,
+            false,
+        );
+        let values_field = Field::new_dict(
+            "values",
+            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+            true,
+            1,
+            false,
+        );
+        let entry_struct = StructArray::from(vec![
+            (keys_field, make_array(key_dict_array.data().clone())),
+            (values_field, make_array(value_dict_array.data().clone())),
+        ]);
+        let map_data_type = DataType::Map(
+            Box::new(Field::new(
+                "entries",
+                entry_struct.data_type().clone(),
+                true,
+            )),
+            false,
+        );
+
+        let entry_offsets = Buffer::from_slice_ref(&[0, 2, 4, 6]);
+        let map_data = ArrayData::builder(map_data_type)
+            .len(3)
+            .add_buffer(entry_offsets)
+            .add_child_data(entry_struct.data().clone())
+            .build()
+            .unwrap();
+        let map_array = MapArray::from(map_data);
+
+        let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
+        let dict_dict_array =
+            DictionaryArray::<Int8Type>::try_new(&dict_keys, &map_array).unwrap();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "f1",
+            dict_dict_array.data_type().clone(),
+            false,
+        )]));
+        let input_batch =
+            RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
+        let output_batch = roundtrip_ipc_stream(&input_batch);
+        assert_eq!(input_batch, output_batch);
+    }
+
     fn test_roundtrip_stream_dict_of_list_of_dict_impl<
         OffsetSize: OffsetSizeTrait,
         U: ArrowNativeType,
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index 86bddd118..1f73d16d2 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -26,8 +26,8 @@ use std::io::{BufWriter, Write};
 use flatbuffers::FlatBufferBuilder;
 
 use crate::array::{
-    as_large_list_array, as_list_array, as_struct_array, as_union_array, make_array,
-    ArrayData, ArrayRef,
+    as_large_list_array, as_list_array, as_map_array, as_struct_array, as_union_array,
+    make_array, Array, ArrayData, ArrayRef,
 };
 use crate::buffer::{Buffer, MutableBuffer};
 use crate::datatypes::*;
@@ -147,7 +147,7 @@ impl IpcDataGenerator {
         dictionary_tracker: &mut DictionaryTracker,
         write_options: &IpcWriteOptions,
     ) -> Result<()> {
-        // TODO: Handle other nested types (map, etc)
+        // TODO: Handle other nested types (FixedSizeList)
         match column.data_type() {
             DataType::Struct(fields) => {
                 let s = as_struct_array(column);
@@ -181,6 +181,34 @@ impl IpcDataGenerator {
                     write_options,
                 )?;
             }
+            DataType::Map(field, _) => {
+                let map_array = as_map_array(column);
+
+                let (keys, values) = match field.data_type() {
+                    DataType::Struct(fields) if fields.len() == 2 => {
+                        (&fields[0], &fields[1])
+                    }
+                    _ => panic!("Incorrect field data type {:?}", field.data_type()),
+                };
+
+                // keys
+                self.encode_dictionaries(
+                    keys,
+                    &map_array.keys(),
+                    encoded_dictionaries,
+                    dictionary_tracker,
+                    write_options,
+                )?;
+
+                // values
+                self.encode_dictionaries(
+                    values,
+                    &map_array.values(),
+                    encoded_dictionaries,
+                    dictionary_tracker,
+                    write_options,
+                )?;
+            }
             DataType::Union(fields, _) => {
                 let union = as_union_array(column);
                 for (field, ref column) in fields