You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/13 11:28:36 UTC

[arrow-rs] branch master updated: Fix reading dictionaries from nested structs in ipc `StreamReader` (#1550)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ffb9b0b13 Fix reading dictionaries from nested structs in ipc `StreamReader` (#1550)
ffb9b0b13 is described below

commit ffb9b0b13840c6c06ca5488b2eb478923dd061ab
Author: Thomas Peiselt <gi...@kulturguerilla.org>
AuthorDate: Wed Apr 13 13:28:32 2022 +0200

    Fix reading dictionaries from nested structs in ipc `StreamReader` (#1550)
    
    * Fix reading dictionaries from nested structs in ipc `StreamReader`
    
    * Fix clippy error
    
    * Apply review comment about field naming in test
---
 arrow/src/ipc/reader.rs | 47 ++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 46 insertions(+), 1 deletion(-)

diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 6a70768a2..19579968d 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -881,7 +881,7 @@ impl<R: Read> StreamReader<R> {
         let schema = ipc::convert::fb_to_schema(ipc_schema);
 
         // Create an array of optional dictionary value arrays, one per field.
-        let dictionaries_by_field = vec![None; schema.fields().len()];
+        let dictionaries_by_field = vec![None; schema.all_fields().len()];
 
         let projection = match projection {
             Some(projection_indices) => {
@@ -1317,6 +1317,19 @@ mod tests {
         reader.next().unwrap().unwrap()
     }
 
+    fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
+        let mut buf = Vec::new();
+        let mut writer =
+            ipc::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
+        writer.write(rb).unwrap();
+        writer.finish().unwrap();
+        drop(writer);
+
+        let mut reader =
+            ipc::reader::StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+        reader.next().unwrap().unwrap()
+    }
+
     #[test]
     fn test_roundtrip_nested_dict() {
         let inner: DictionaryArray<datatypes::Int32Type> =
@@ -1394,4 +1407,36 @@ mod tests {
         let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
         arrow_json
     }
+
+    #[test]
+    fn test_roundtrip_stream_nested_dict() {
+        let xs = vec!["AA", "BB", "AA", "CC", "BB"];
+        let dict = Arc::new(
+            xs.clone()
+                .into_iter()
+                .collect::<DictionaryArray<datatypes::Int8Type>>(),
+        );
+        let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
+        let struct_array = StructArray::from(vec![
+            (Field::new("f2.1", DataType::Utf8, false), string_array),
+            (
+                Field::new("f2.2_struct", dict.data_type().clone(), false),
+                dict.clone() as ArrayRef,
+            ),
+        ]);
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("f1_string", DataType::Utf8, false),
+            Field::new("f2_struct", struct_array.data_type().clone(), false),
+        ]));
+        let input_batch = RecordBatch::try_new(
+            schema,
+            vec![
+                Arc::new(StringArray::from(xs.clone())),
+                Arc::new(struct_array),
+            ],
+        )
+        .unwrap();
+        let output_batch = roundtrip_ipc_stream(&input_batch);
+        assert_eq!(input_batch, output_batch);
+    }
 }