You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/20 18:59:00 UTC
[arrow-rs] branch master updated: Read/write nested dictionary under map in ipc stream reader/writer (#1583)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d6b396234 Read/write nested dictionary under map in ipc stream reader/writer (#1583)
d6b396234 is described below
commit d6b396234d2b099b8b8aab231c91ac6cbb48fdd7
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Wed Apr 20 11:58:55 2022 -0700
Read/write nested dictionary under map in ipc stream reader/writer (#1583)
* IPC read/write nested dict in map
* For review comments
* update comment
---
arrow/src/datatypes/field.rs | 2 +-
arrow/src/ipc/reader.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++
arrow/src/ipc/writer.rs | 34 +++++++++++++++++++++---
3 files changed, 94 insertions(+), 4 deletions(-)
diff --git a/arrow/src/datatypes/field.rs b/arrow/src/datatypes/field.rs
index 6fccd2209..7ad790268 100644
--- a/arrow/src/datatypes/field.rs
+++ b/arrow/src/datatypes/field.rs
@@ -131,7 +131,7 @@ impl Field {
DataType::List(field)
| DataType::LargeList(field)
| DataType::FixedSizeList(field, _)
- | DataType::Map(field, _) => collected_fields.push(field),
+ | DataType::Map(field, _) => collected_fields.extend(field.fields()),
DataType::Dictionary(_, value_field) => {
collected_fields.append(&mut self._fields(value_field.as_ref()))
}
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 748d9db47..33d608576 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -1444,6 +1444,68 @@ mod tests {
assert_eq!(input_batch, output_batch);
}
+ #[test]
+ fn test_roundtrip_stream_nested_dict_of_map_of_dict() {
+ let values = StringArray::from(vec![Some("a"), None, Some("b"), Some("c")]);
+ let value_dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 3, 1]);
+ let value_dict_array =
+ DictionaryArray::<Int8Type>::try_new(&value_dict_keys, &values).unwrap();
+
+ let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
+ let key_dict_array =
+ DictionaryArray::<Int8Type>::try_new(&key_dict_keys, &values).unwrap();
+
+ let keys_field = Field::new_dict(
+ "keys",
+ DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+ true,
+ 1,
+ false,
+ );
+ let values_field = Field::new_dict(
+ "values",
+ DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
+ true,
+ 1,
+ false,
+ );
+ let entry_struct = StructArray::from(vec![
+ (keys_field, make_array(key_dict_array.data().clone())),
+ (values_field, make_array(value_dict_array.data().clone())),
+ ]);
+ let map_data_type = DataType::Map(
+ Box::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ true,
+ )),
+ false,
+ );
+
+ let entry_offsets = Buffer::from_slice_ref(&[0, 2, 4, 6]);
+ let map_data = ArrayData::builder(map_data_type)
+ .len(3)
+ .add_buffer(entry_offsets)
+ .add_child_data(entry_struct.data().clone())
+ .build()
+ .unwrap();
+ let map_array = MapArray::from(map_data);
+
+ let dict_keys = Int8Array::from_iter_values([0, 1, 1, 2, 2, 1]);
+ let dict_dict_array =
+ DictionaryArray::<Int8Type>::try_new(&dict_keys, &map_array).unwrap();
+
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "f1",
+ dict_dict_array.data_type().clone(),
+ false,
+ )]));
+ let input_batch =
+ RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
+ let output_batch = roundtrip_ipc_stream(&input_batch);
+ assert_eq!(input_batch, output_batch);
+ }
+
fn test_roundtrip_stream_dict_of_list_of_dict_impl<
OffsetSize: OffsetSizeTrait,
U: ArrowNativeType,
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index 86bddd118..1f73d16d2 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -26,8 +26,8 @@ use std::io::{BufWriter, Write};
use flatbuffers::FlatBufferBuilder;
use crate::array::{
- as_large_list_array, as_list_array, as_struct_array, as_union_array, make_array,
- ArrayData, ArrayRef,
+ as_large_list_array, as_list_array, as_map_array, as_struct_array, as_union_array,
+ make_array, Array, ArrayData, ArrayRef,
};
use crate::buffer::{Buffer, MutableBuffer};
use crate::datatypes::*;
@@ -147,7 +147,7 @@ impl IpcDataGenerator {
dictionary_tracker: &mut DictionaryTracker,
write_options: &IpcWriteOptions,
) -> Result<()> {
- // TODO: Handle other nested types (map, etc)
+ // TODO: Handle other nested types (FixedSizeList)
match column.data_type() {
DataType::Struct(fields) => {
let s = as_struct_array(column);
@@ -181,6 +181,34 @@ impl IpcDataGenerator {
write_options,
)?;
}
+ DataType::Map(field, _) => {
+ let map_array = as_map_array(column);
+
+ let (keys, values) = match field.data_type() {
+ DataType::Struct(fields) if fields.len() == 2 => {
+ (&fields[0], &fields[1])
+ }
+ _ => panic!("Incorrect field data type {:?}", field.data_type()),
+ };
+
+ // keys
+ self.encode_dictionaries(
+ keys,
+ &map_array.keys(),
+ encoded_dictionaries,
+ dictionary_tracker,
+ write_options,
+ )?;
+
+ // values
+ self.encode_dictionaries(
+ values,
+ &map_array.values(),
+ encoded_dictionaries,
+ dictionary_tracker,
+ write_options,
+ )?;
+ }
DataType::Union(fields, _) => {
let union = as_union_array(column);
for (field, ref column) in fields