You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/20 07:25:22 UTC

[arrow-rs] branch master updated: Read/write nested dictionary under large list in ipc stream reader/writer (#1585)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b4642ecf9 Read/write nested dictionary under large list in ipc stream reader/writer (#1585)
b4642ecf9 is described below

commit b4642ecf9cb4a3d8b5741655fdc7d06b71bf41e6
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Apr 20 00:25:18 2022 -0700

    Read/write nested dictionary under large list in ipc stream reader/writer (#1585)
    
    * Read/Write nested dictionaries under LargeList in IPC
    
    * For review comments
---
 arrow/src/ipc/reader.rs | 62 ++++++++++++++++++++++++++++++++++++-------------
 arrow/src/ipc/writer.rs | 13 ++++++++++-
 2 files changed, 58 insertions(+), 17 deletions(-)

diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 143fa929d..748d9db47 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -1019,7 +1019,7 @@ mod tests {
 
     use flate2::read::GzDecoder;
 
-    use crate::datatypes::Int8Type;
+    use crate::datatypes::{ArrowNativeType, Int8Type};
     use crate::{datatypes, util::integration_util::*};
 
     #[test]
@@ -1444,32 +1444,31 @@ mod tests {
         assert_eq!(input_batch, output_batch);
     }
 
-    #[test]
-    fn test_roundtrip_stream_nested_dict_dict() {
-        let values = StringArray::from_iter_values(["a", "b", "c"]);
-        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1]);
+    fn test_roundtrip_stream_dict_of_list_of_dict_impl<
+        OffsetSize: OffsetSizeTrait,
+        U: ArrowNativeType,
+    >(
+        list_data_type: DataType,
+        offsets: &[U; 5],
+    ) {
+        let values = StringArray::from(vec![Some("a"), None, Some("c"), None]);
+        let keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
         let dict_array = DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap();
         let dict_data = dict_array.data();
 
-        let value_offsets = Buffer::from_slice_ref(&[0, 2, 4, 6]);
+        let value_offsets = Buffer::from_slice_ref(offsets);
 
-        let list_data_type = DataType::List(Box::new(Field::new_dict(
-            "item",
-            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
-            false,
-            1,
-            false,
-        )));
         let list_data = ArrayData::builder(list_data_type)
-            .len(3)
+            .len(4)
             .add_buffer(value_offsets)
             .add_child_data(dict_data.clone())
             .build()
             .unwrap();
-        let list_array = ListArray::from(list_data);
+        let list_array = GenericListArray::<OffsetSize>::from(list_data);
 
+        let keys_for_dict = Int8Array::from_iter_values([0, 3, 0, 1, 1, 2, 0, 1, 3]);
         let dict_dict_array =
-            DictionaryArray::<Int8Type>::try_new(&keys, &list_array).unwrap();
+            DictionaryArray::<Int8Type>::try_new(&keys_for_dict, &list_array).unwrap();
 
         let schema = Arc::new(Schema::new(vec![Field::new(
             "f1",
@@ -1481,4 +1480,35 @@ mod tests {
         let output_batch = roundtrip_ipc_stream(&input_batch);
         assert_eq!(input_batch, output_batch);
     }
+
+    #[test]
+    fn test_roundtrip_stream_dict_of_list_of_dict() {
+        // list
+        let list_data_type = DataType::List(Box::new(Field::new_dict(
+            "item",
+            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+            true,
+            1,
+            false,
+        )));
+        let offsets: &[i32; 5] = &[0, 2, 4, 4, 6];
+        test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(
+            list_data_type,
+            offsets,
+        );
+
+        // large list
+        let list_data_type = DataType::LargeList(Box::new(Field::new_dict(
+            "item",
+            DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+            true,
+            1,
+            false,
+        )));
+        let offsets: &[i64; 5] = &[0, 2, 4, 4, 7];
+        test_roundtrip_stream_dict_of_list_of_dict_impl::<i64, i64>(
+            list_data_type,
+            offsets,
+        );
+    }
 }
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index a5b35f364..86bddd118 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -26,7 +26,8 @@ use std::io::{BufWriter, Write};
 use flatbuffers::FlatBufferBuilder;
 
 use crate::array::{
-    as_list_array, as_struct_array, as_union_array, make_array, ArrayData, ArrayRef,
+    as_large_list_array, as_list_array, as_struct_array, as_union_array, make_array,
+    ArrayData, ArrayRef,
 };
 use crate::buffer::{Buffer, MutableBuffer};
 use crate::datatypes::*;
@@ -170,6 +171,16 @@ impl IpcDataGenerator {
                     write_options,
                 )?;
             }
+            DataType::LargeList(field) => {
+                let list = as_large_list_array(column);
+                self.encode_dictionaries(
+                    field,
+                    &list.values(),
+                    encoded_dictionaries,
+                    dictionary_tracker,
+                    write_options,
+                )?;
+            }
             DataType::Union(fields, _) => {
                 let union = as_union_array(column);
                 for (field, ref column) in fields