You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/07/31 05:21:01 UTC
[arrow-rs] branch master updated: Minimal MapArray support (#491)
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9be938e Minimal MapArray support (#491)
9be938e is described below
commit 9be938e8d2847cf8d41bc59f0c907f23ff61cc3c
Author: Wakahisa <ne...@gmail.com>
AuthorDate: Sat Jul 31 07:20:56 2021 +0200
Minimal MapArray support (#491)
* add DataType::Map to datatypes
* barebones MapArray and MapBuilder
This commit adds the MapArray and MapBuilder.
The interfaces are however incomplete at this stage.
* minimal IPC read and write
* barebones MapArray (missed)
* add equality for map, relying on list
A map is a list with some specific rules, so for equality it is the same as a list
* json reader for MapArray
* add schema roundtrip
* read and write maps from/to arrow map
* clippy
* Calculate map levels separately
Avoids the generic case of list > struct > [ley, value], which adds overhead
* Fix map reader context and path
* Map array tests
* add doc comments and clean up code
* wip: review feedback
* add test for map
* fix clippy 1.54 lints
---
arrow/src/array/array.rs | 26 +++
arrow/src/array/array_map.rs | 421 +++++++++++++++++++++++++++++++++++++
arrow/src/array/builder.rs | 211 +++++++++++++++++++
arrow/src/array/data.rs | 5 +-
arrow/src/array/equal/mod.rs | 11 +-
arrow/src/array/equal/utils.rs | 2 +-
arrow/src/array/equal_json.rs | 32 +++
arrow/src/array/mod.rs | 2 +
arrow/src/datatypes/datatype.rs | 31 +++
arrow/src/datatypes/field.rs | 33 +++
arrow/src/datatypes/mod.rs | 175 +++++++++++++++
arrow/src/ipc/convert.rs | 18 ++
arrow/src/ipc/reader.rs | 22 +-
arrow/src/ipc/writer.rs | 4 +
arrow/src/json/reader.rs | 177 ++++++++++++++++
arrow/src/util/integration_util.rs | 50 +++++
parquet/src/arrow/array_reader.rs | 235 +++++++++++++++++++--
parquet/src/arrow/arrow_reader.rs | 16 ++
parquet/src/arrow/arrow_writer.rs | 39 ++++
parquet/src/arrow/levels.rs | 132 +++++++++++-
parquet/src/arrow/schema.rs | 312 +++++++++++++++++++++++++--
21 files changed, 1914 insertions(+), 40 deletions(-)
diff --git a/arrow/src/array/array.rs b/arrow/src/array/array.rs
index d715bc4..4702179 100644
--- a/arrow/src/array/array.rs
+++ b/arrow/src/array/array.rs
@@ -296,6 +296,7 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef,
DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as ArrayRef,
DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef,
+ DataType::Map(_, _) => Arc::new(MapArray::from(data)) as ArrayRef,
DataType::Union(_) => Arc::new(UnionArray::from(data)) as ArrayRef,
DataType::FixedSizeList(_, _) => {
Arc::new(FixedSizeListArray::from(data)) as ArrayRef
@@ -452,6 +453,9 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
.map(|field| ArrayData::new_empty(field.data_type()))
.collect(),
)),
+ DataType::Map(field, _keys_sorted) => {
+ new_null_list_array::<i32>(data_type, field.data_type(), length)
+ }
DataType::Union(_) => {
unimplemented!("Creating null Union array not yet supported")
}
@@ -658,6 +662,28 @@ mod tests {
}
#[test]
+ fn test_null_map() {
+ let data_type = DataType::Map(
+ Box::new(Field::new(
+ "entry",
+ DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("key", DataType::Int32, true),
+ ]),
+ false,
+ )),
+ false,
+ );
+ let array = new_null_array(&data_type, 9);
+ let a = array.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(a.len(), 9);
+ assert_eq!(a.value_offsets()[9], 0i32);
+ for i in 0..9 {
+ assert!(a.is_null(i));
+ }
+ }
+
+ #[test]
fn test_null_dictionary() {
let values = vec![None, None, None, None, None, None, None, None, None]
as Vec<Option<&str>>;
diff --git a/arrow/src/array/array_map.rs b/arrow/src/array/array_map.rs
new file mode 100644
index 0000000..b10c39e
--- /dev/null
+++ b/arrow/src/array/array_map.rs
@@ -0,0 +1,421 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt;
+use std::mem;
+
+use super::make_array;
+use super::{
+ array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayRef,
+};
+use crate::datatypes::{ArrowNativeType, DataType};
+use crate::error::ArrowError;
+
+/// A nested array type where each record is a key-value map.
+/// Keys should always be non-null, but values can be null.
+///
+/// [MapArray] is physically a [ListArray] that has a [StructArray]
+/// with 2 child fields.
+pub struct MapArray {
+ data: ArrayData,
+ values: ArrayRef,
+ value_offsets: RawPtrBox<i32>,
+}
+
+impl MapArray {
+ /// Returns a reference to the keys of this map.
+ pub fn keys(&self) -> ArrayRef {
+ make_array(self.values.data().child_data()[0].clone())
+ }
+
+ /// Returns a reference to the values of this map.
+ pub fn values(&self) -> ArrayRef {
+ make_array(self.values.data().child_data()[1].clone())
+ }
+
+ /// Returns the data type of the map's keys.
+ pub fn key_type(&self) -> DataType {
+ self.values.data().child_data()[0].data_type().clone()
+ }
+
+ /// Returns the data type of the map's values.
+ pub fn value_type(&self) -> DataType {
+ self.values.data().child_data()[1].data_type().clone()
+ }
+
+ /// Returns ith value of this map array.
+ /// # Safety
+ /// Caller must ensure that the index is within the array bounds
+ pub unsafe fn value_unchecked(&self, i: usize) -> ArrayRef {
+ let end = *self.value_offsets().get_unchecked(i + 1);
+ let start = *self.value_offsets().get_unchecked(i);
+ self.values
+ .slice(start.to_usize().unwrap(), (end - start).to_usize().unwrap())
+ }
+
+ /// Returns ith value of this map array.
+ pub fn value(&self, i: usize) -> ArrayRef {
+ let end = self.value_offsets()[i + 1] as usize;
+ let start = self.value_offsets()[i] as usize;
+ self.values.slice(start, end - start)
+ }
+
+ /// Returns the offset values in the offsets buffer
+ #[inline]
+ pub fn value_offsets(&self) -> &[i32] {
+ // Soundness
+ // pointer alignment & location is ensured by RawPtrBox
+ // buffer bounds/offset is ensured by the ArrayData instance.
+ unsafe {
+ std::slice::from_raw_parts(
+ self.value_offsets.as_ptr().add(self.data.offset()),
+ self.len() + 1,
+ )
+ }
+ }
+
+ /// Returns the length for value at index `i`.
+ #[inline]
+ pub fn value_length(&self, i: usize) -> i32 {
+ let offsets = self.value_offsets();
+ offsets[i + 1] - offsets[i]
+ }
+}
+
+impl From<ArrayData> for MapArray {
+ fn from(data: ArrayData) -> Self {
+ Self::try_new_from_array_data(data)
+ .expect("Expected infallable creation of MapArray from ArrayData failed")
+ }
+}
+
+impl MapArray {
+ fn try_new_from_array_data(data: ArrayData) -> Result<Self, ArrowError> {
+ if data.buffers().len() != 1 {
+ return Err(ArrowError::InvalidArgumentError(
+ format!("MapArray data should contain a single buffer only (value offsets), had {}",
+ data.len())));
+ }
+
+ if data.child_data().len() != 1 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "MapArray should contain a single child array (values array), had {}",
+ data.child_data().len()
+ )));
+ }
+
+ let entries = data.child_data()[0].clone();
+
+ if let DataType::Struct(fields) = entries.data_type() {
+ if fields.len() != 2 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "MapArray should contain a struct array with 2 fields, have {} fields",
+ fields.len()
+ )));
+ }
+ } else {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "MapArray should contain a struct array child, found {:?}",
+ entries.data_type()
+ )));
+ }
+
+ let values = make_array(entries);
+ let value_offsets = data.buffers()[0].as_ptr();
+
+ let value_offsets = unsafe { RawPtrBox::<i32>::new(value_offsets) };
+ unsafe {
+ if (*value_offsets.as_ptr().offset(0)) != 0 {
+ return Err(ArrowError::InvalidArgumentError(String::from(
+ "offsets do not start at zero",
+ )));
+ }
+ }
+ Ok(Self {
+ data,
+ values,
+ value_offsets,
+ })
+ }
+}
+
+impl Array for MapArray {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn data(&self) -> &ArrayData {
+ &self.data
+ }
+
+ /// Returns the total number of bytes of memory occupied by the buffers owned by this [MapArray].
+ fn get_buffer_memory_size(&self) -> usize {
+ self.data.get_buffer_memory_size()
+ }
+
+ /// Returns the total number of bytes of memory occupied physically by this [MapArray].
+ fn get_array_memory_size(&self) -> usize {
+ self.data.get_array_memory_size() + mem::size_of_val(self)
+ }
+}
+
+impl fmt::Debug for MapArray {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "MapArray\n[\n")?;
+ print_long_array(self, f, |array, index, f| {
+ fmt::Debug::fmt(&array.value(index), f)
+ })?;
+ write!(f, "]")
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use crate::{
+ array::ArrayData,
+ array::{Int32Array, StructArray, UInt32Array},
+ buffer::Buffer,
+ datatypes::Field,
+ datatypes::ToByteSlice,
+ };
+
+ use super::*;
+
+ fn create_from_buffers() -> MapArray {
+ // Construct key and values
+ let keys_data = ArrayData::builder(DataType::Int32)
+ .len(8)
+ .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+ .build();
+ let values_data = ArrayData::builder(DataType::UInt32)
+ .len(8)
+ .add_buffer(Buffer::from(
+ &[0u32, 10, 20, 30, 40, 50, 60, 70].to_byte_slice(),
+ ))
+ .build();
+
+ // Construct a buffer for value offsets, for the nested array:
+ // [[0, 1, 2], [3, 4, 5], [6, 7]]
+ let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+
+ let keys = Field::new("keys", DataType::Int32, false);
+ let values = Field::new("values", DataType::UInt32, false);
+ let entry_struct = StructArray::from(vec![
+ (keys, make_array(keys_data)),
+ (values, make_array(values_data)),
+ ]);
+
+ // Construct a map array from the above two
+ let map_data_type = DataType::Map(
+ Box::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ true,
+ )),
+ false,
+ );
+ let map_data = ArrayData::builder(map_data_type)
+ .len(3)
+ .add_buffer(entry_offsets)
+ .add_child_data(entry_struct.data().clone())
+ .build();
+ MapArray::from(map_data)
+ }
+
+ #[test]
+ fn test_map_array() {
+ // Construct key and values
+ let key_data = ArrayData::builder(DataType::Int32)
+ .len(8)
+ .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
+ .build();
+ let value_data = ArrayData::builder(DataType::UInt32)
+ .len(8)
+ .add_buffer(Buffer::from(
+ &[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
+ ))
+ .null_bit_buffer(Buffer::from(&[0b11010110]))
+ .build();
+
+ // Construct a buffer for value offsets, for the nested array:
+ // [[0, 1, 2], [3, 4, 5], [6, 7]]
+ let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
+
+ let keys_field = Field::new("keys", DataType::Int32, false);
+ let values_field = Field::new("values", DataType::UInt32, true);
+ let entry_struct = StructArray::from(vec![
+ (keys_field.clone(), make_array(key_data)),
+ (values_field.clone(), make_array(value_data.clone())),
+ ]);
+
+ // Construct a map array from the above two
+ let map_data_type = DataType::Map(
+ Box::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ true,
+ )),
+ false,
+ );
+ let map_data = ArrayData::builder(map_data_type)
+ .len(3)
+ .add_buffer(entry_offsets)
+ .add_child_data(entry_struct.data().clone())
+ .build();
+ let map_array = MapArray::from(map_data);
+
+ let values = map_array.values();
+ assert_eq!(&value_data, values.data());
+ assert_eq!(DataType::UInt32, map_array.value_type());
+ assert_eq!(3, map_array.len());
+ assert_eq!(0, map_array.null_count());
+ assert_eq!(6, map_array.value_offsets()[2]);
+ assert_eq!(2, map_array.value_length(2));
+
+ let key_array = Arc::new(Int32Array::from(vec![0, 1, 2])) as ArrayRef;
+ let value_array =
+ Arc::new(UInt32Array::from(vec![None, Some(10u32), Some(20)])) as ArrayRef;
+ let struct_array = StructArray::from(vec![
+ (keys_field.clone(), key_array),
+ (values_field.clone(), value_array),
+ ]);
+ assert_eq!(
+ struct_array,
+ StructArray::from(map_array.value(0).data().clone())
+ );
+ assert_eq!(
+ &struct_array,
+ unsafe { map_array.value_unchecked(0) }
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap()
+ );
+ for i in 0..3 {
+ assert!(map_array.is_valid(i));
+ assert!(!map_array.is_null(i));
+ }
+
+ // Now test with a non-zero offset
+ let map_data = ArrayData::builder(map_array.data_type().clone())
+ .len(3)
+ .offset(1)
+ .add_buffer(map_array.data().buffers()[0].clone())
+ .add_child_data(map_array.data().child_data()[0].clone())
+ .build();
+ let map_array = MapArray::from(map_data);
+
+ let values = map_array.values();
+ assert_eq!(&value_data, values.data());
+ assert_eq!(DataType::UInt32, map_array.value_type());
+ assert_eq!(3, map_array.len());
+ assert_eq!(0, map_array.null_count());
+ assert_eq!(6, map_array.value_offsets()[1]);
+ assert_eq!(2, map_array.value_length(1));
+
+ let key_array = Arc::new(Int32Array::from(vec![3, 4, 5])) as ArrayRef;
+ let value_array =
+ Arc::new(UInt32Array::from(vec![None, Some(40), None])) as ArrayRef;
+ let struct_array =
+ StructArray::from(vec![(keys_field, key_array), (values_field, value_array)]);
+ assert_eq!(
+ &struct_array,
+ map_array
+ .value(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap()
+ );
+ assert_eq!(
+ &struct_array,
+ unsafe { map_array.value_unchecked(0) }
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap()
+ );
+ }
+
+ #[test]
+ #[ignore = "Test fails because slice of <list<struct>> is still buggy"]
+ fn test_map_array_slice() {
+ let map_array = create_from_buffers();
+
+ let sliced_array = map_array.slice(1, 2);
+ assert_eq!(2, sliced_array.len());
+ assert_eq!(1, sliced_array.offset());
+ let sliced_array_data = sliced_array.data();
+ for array_data in sliced_array_data.child_data() {
+ assert_eq!(array_data.offset(), 1);
+ }
+
+ // Check offset and length for each non-null value.
+ let sliced_map_array = sliced_array.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(3, sliced_map_array.value_offsets()[0]);
+ assert_eq!(3, sliced_map_array.value_length(0));
+ assert_eq!(6, sliced_map_array.value_offsets()[1]);
+ assert_eq!(2, sliced_map_array.value_length(1));
+
+ // Construct key and values
+ let keys_data = ArrayData::builder(DataType::Int32)
+ .len(5)
+ .add_buffer(Buffer::from(&[3, 4, 5, 6, 7].to_byte_slice()))
+ .build();
+ let values_data = ArrayData::builder(DataType::UInt32)
+ .len(5)
+ .add_buffer(Buffer::from(&[30u32, 40, 50, 60, 70].to_byte_slice()))
+ .build();
+
+ // Construct a buffer for value offsets, for the nested array:
+ // [[3, 4, 5], [6, 7]]
+ let entry_offsets = Buffer::from(&[0, 3, 5].to_byte_slice());
+
+ let keys = Field::new("keys", DataType::Int32, false);
+ let values = Field::new("values", DataType::UInt32, false);
+ let entry_struct = StructArray::from(vec![
+ (keys, make_array(keys_data)),
+ (values, make_array(values_data)),
+ ]);
+
+ // Construct a map array from the above two
+ let map_data_type = DataType::Map(
+ Box::new(Field::new(
+ "entries",
+ entry_struct.data_type().clone(),
+ true,
+ )),
+ false,
+ );
+ let expected_map_data = ArrayData::builder(map_data_type)
+ .len(2)
+ .add_buffer(entry_offsets)
+ .add_child_data(entry_struct.data().clone())
+ .build();
+ let expected_map_array = MapArray::from(expected_map_data);
+
+ assert_eq!(&expected_map_array, sliced_map_array)
+ }
+
+ #[test]
+ #[should_panic(expected = "index out of bounds: the len is ")]
+ fn test_map_array_index_out_of_bound() {
+ let map_array = create_from_buffers();
+
+ map_array.value(map_array.len());
+ }
+}
diff --git a/arrow/src/array/builder.rs b/arrow/src/array/builder.rs
index 8f3f730..fc0a5c8 100644
--- a/arrow/src/array/builder.rs
+++ b/arrow/src/array/builder.rs
@@ -1594,6 +1594,163 @@ impl StructBuilder {
}
}
+#[derive(Debug)]
+pub struct MapBuilder<K: ArrayBuilder, V: ArrayBuilder> {
+ offsets_builder: BufferBuilder<i32>,
+ bitmap_builder: BooleanBufferBuilder,
+ field_names: MapFieldNames,
+ key_builder: K,
+ value_builder: V,
+ len: i32,
+}
+
+#[derive(Debug, Clone)]
+pub struct MapFieldNames {
+ pub entry: String,
+ pub key: String,
+ pub value: String,
+}
+
+impl Default for MapFieldNames {
+ fn default() -> Self {
+ Self {
+ entry: "entries".to_string(),
+ key: "keys".to_string(),
+ value: "values".to_string(),
+ }
+ }
+}
+
+impl<K: ArrayBuilder, V: ArrayBuilder> MapBuilder<K, V> {
+ pub fn new(
+ field_names: Option<MapFieldNames>,
+ key_builder: K,
+ value_builder: V,
+ ) -> Self {
+ let capacity = key_builder.len();
+ Self::with_capacity(field_names, key_builder, value_builder, capacity)
+ }
+
+ pub fn with_capacity(
+ field_names: Option<MapFieldNames>,
+ key_builder: K,
+ value_builder: V,
+ capacity: usize,
+ ) -> Self {
+ let mut offsets_builder = BufferBuilder::<i32>::new(capacity + 1);
+ let len = 0;
+ offsets_builder.append(len);
+ Self {
+ offsets_builder,
+ bitmap_builder: BooleanBufferBuilder::new(capacity),
+ field_names: field_names.unwrap_or_default(),
+ key_builder,
+ value_builder,
+ len,
+ }
+ }
+
+ pub fn keys(&mut self) -> &mut K {
+ &mut self.key_builder
+ }
+
+ pub fn values(&mut self) -> &mut V {
+ &mut self.value_builder
+ }
+
+ /// Finish the current map array slot
+ #[inline]
+ pub fn append(&mut self, is_valid: bool) -> Result<()> {
+ if self.key_builder.len() != self.value_builder.len() {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Cannot append to a map builder when its keys and values have unequal lengths of {} and {}",
+ self.key_builder.len(),
+ self.value_builder.len()
+ )));
+ }
+ self.offsets_builder.append(self.key_builder.len() as i32);
+ self.bitmap_builder.append(is_valid);
+ self.len += 1;
+ Ok(())
+ }
+
+ pub fn finish(&mut self) -> MapArray {
+ let len = self.len();
+ self.len = 0;
+
+ // Build the keys
+ let keys_arr = self
+ .key_builder
+ .as_any_mut()
+ .downcast_mut::<K>()
+ .unwrap()
+ .finish();
+ let values_arr = self
+ .value_builder
+ .as_any_mut()
+ .downcast_mut::<V>()
+ .unwrap()
+ .finish();
+
+ let keys_field = Field::new(
+ self.field_names.key.as_str(),
+ keys_arr.data_type().clone(),
+ false, // always nullable
+ );
+ let values_field = Field::new(
+ self.field_names.value.as_str(),
+ values_arr.data_type().clone(),
+ true,
+ );
+
+ let struct_array =
+ StructArray::from(vec![(keys_field, keys_arr), (values_field, values_arr)]);
+
+ let offset_buffer = self.offsets_builder.finish();
+ let null_bit_buffer = self.bitmap_builder.finish();
+ self.offsets_builder.append(self.len);
+ let map_field = Box::new(Field::new(
+ self.field_names.entry.as_str(),
+ struct_array.data_type().clone(),
+ false, // always non-nullable
+ ));
+ let data = ArrayData::builder(DataType::Map(map_field, false)) // TODO: support sorted keys
+ .len(len)
+ .add_buffer(offset_buffer)
+ .add_child_data(struct_array.data().clone())
+ .null_bit_buffer(null_bit_buffer)
+ .build();
+
+ MapArray::from(data)
+ }
+}
+
+impl<K: ArrayBuilder, V: ArrayBuilder> ArrayBuilder for MapBuilder<K, V> {
+ fn len(&self) -> usize {
+ self.len as usize
+ }
+
+ fn is_empty(&self) -> bool {
+ self.len == 0
+ }
+
+ fn finish(&mut self) -> ArrayRef {
+ Arc::new(self.finish())
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn as_any_mut(&mut self) -> &mut dyn Any {
+ self
+ }
+
+ fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
+ self
+ }
+}
+
/// `FieldData` is a helper struct to track the state of the fields in the `UnionBuilder`.
#[derive(Debug)]
struct FieldData {
@@ -3185,6 +3342,60 @@ mod tests {
}
#[test]
+ fn test_map_array_builder() {
+ let string_builder = StringBuilder::new(4);
+ let int_builder = Int32Builder::new(4);
+
+ let mut builder = MapBuilder::new(None, string_builder, int_builder);
+
+ let string_builder = builder.keys();
+ string_builder.append_value("joe").unwrap();
+ string_builder.append_null().unwrap();
+ string_builder.append_null().unwrap();
+ string_builder.append_value("mark").unwrap();
+
+ let int_builder = builder.values();
+ int_builder.append_value(1).unwrap();
+ int_builder.append_value(2).unwrap();
+ int_builder.append_null().unwrap();
+ int_builder.append_value(4).unwrap();
+
+ builder.append(true).unwrap();
+ builder.append(false).unwrap();
+ builder.append(true).unwrap();
+
+ let arr = builder.finish();
+
+ let map_data = arr.data();
+ assert_eq!(3, map_data.len());
+ assert_eq!(1, map_data.null_count());
+ assert_eq!(
+ &Some(Bitmap::from(Buffer::from(&[5_u8]))),
+ map_data.null_bitmap()
+ );
+
+ let expected_string_data = ArrayData::builder(DataType::Utf8)
+ .len(4)
+ .null_bit_buffer(Buffer::from(&[9_u8]))
+ .add_buffer(Buffer::from_slice_ref(&[0, 3, 3, 3, 7]))
+ .add_buffer(Buffer::from_slice_ref(b"joemark"))
+ .build();
+
+ let expected_int_data = ArrayData::builder(DataType::Int32)
+ .len(4)
+ .null_bit_buffer(Buffer::from_slice_ref(&[11_u8]))
+ .add_buffer(Buffer::from_slice_ref(&[1, 2, 0, 4]))
+ .build();
+
+ assert_eq!(&expected_string_data, arr.keys().data());
+ assert_eq!(&expected_int_data, arr.values().data());
+ }
+
+ // TODO: add a test that finishes building, after designing a spec-compliant
+ // way of inserting values to the map.
+ // A map's values shouldn't be repeated within a slot
+
+ #[test]
fn test_struct_array_builder_from_schema() {
let mut fields = Vec::new();
fields.push(Field::new("f1", DataType::Float32, false));
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 228f022..cb389ca 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -126,7 +126,7 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff
buffer.push(0i64);
[buffer, MutableBuffer::new(capacity * mem::size_of::<u8>())]
}
- DataType::List(_) => {
+ DataType::List(_) | DataType::Map(_, _) => {
// offset buffer always starts with a zero
let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<i32>());
buffer.push(0i32);
@@ -475,6 +475,9 @@ impl ArrayData {
.iter()
.map(|field| Self::new_empty(field.data_type()))
.collect(),
+ DataType::Map(field, _) => {
+ vec![Self::new_empty(field.data_type())]
+ }
DataType::Union(_) => unimplemented!(),
DataType::Dictionary(_, data_type) => {
vec![Self::new_empty(data_type)]
diff --git a/arrow/src/array/equal/mod.rs b/arrow/src/array/equal/mod.rs
index 4ddf4e4..8368717 100644
--- a/arrow/src/array/equal/mod.rs
+++ b/arrow/src/array/equal/mod.rs
@@ -22,7 +22,7 @@
use super::{
Array, ArrayData, BinaryOffsetSizeTrait, BooleanArray, DecimalArray,
FixedSizeBinaryArray, FixedSizeListArray, GenericBinaryArray, GenericListArray,
- GenericStringArray, NullArray, OffsetSizeTrait, PrimitiveArray,
+ GenericStringArray, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray,
StringOffsetSizeTrait, StructArray,
};
@@ -117,6 +117,12 @@ impl<OffsetSize: OffsetSizeTrait> PartialEq for GenericListArray<OffsetSize> {
}
}
+impl PartialEq for MapArray {
+ fn eq(&self, other: &Self) -> bool {
+ equal(self.data(), other.data())
+ }
+}
+
impl PartialEq for FixedSizeListArray {
fn eq(&self, other: &Self) -> bool {
equal(self.data(), other.data())
@@ -246,6 +252,9 @@ fn equal_values(
_ => unreachable!(),
},
DataType::Float16 => unreachable!(),
+ DataType::Map(_, _) => {
+ list_equal::<i32>(lhs, rhs, lhs_nulls, rhs_nulls, lhs_start, rhs_start, len)
+ }
}
}
diff --git a/arrow/src/array/equal/utils.rs b/arrow/src/array/equal/utils.rs
index 2a1ce88..1e33a86 100644
--- a/arrow/src/array/equal/utils.rs
+++ b/arrow/src/array/equal/utils.rs
@@ -106,7 +106,7 @@ pub(super) fn child_logical_null_buffer(
Bitmap::from(Buffer::from(vec![0b11111111; ceil]))
});
match parent_data.data_type() {
- DataType::List(_) => Some(logical_list_bitmap::<i32>(
+ DataType::List(_) | DataType::Map(_, _) => Some(logical_list_bitmap::<i32>(
parent_data,
parent_bitmap,
self_null_bitmap,
diff --git a/arrow/src/array/equal_json.rs b/arrow/src/array/equal_json.rs
index 7120e6c..adc33a7 100644
--- a/arrow/src/array/equal_json.rs
+++ b/arrow/src/array/equal_json.rs
@@ -219,6 +219,38 @@ impl PartialEq<StructArray> for Value {
}
}
+impl JsonEqual for MapArray {
+ fn equals_json(&self, json: &[&Value]) -> bool {
+ if self.len() != json.len() {
+ return false;
+ }
+
+ (0..self.len()).all(|i| match json[i] {
+ Value::Array(v) => self.is_valid(i) && self.value(i).equals_json_values(v),
+ Value::Null => self.is_null(i) || self.value_length(i).eq(&0),
+ _ => false,
+ })
+ }
+}
+
+impl PartialEq<Value> for MapArray {
+ fn eq(&self, json: &Value) -> bool {
+ match json {
+ Value::Array(json_array) => self.equals_json_values(json_array),
+ _ => false,
+ }
+ }
+}
+
+impl PartialEq<MapArray> for Value {
+ fn eq(&self, arrow: &MapArray) -> bool {
+ match self {
+ Value::Array(json_array) => arrow.equals_json_values(json_array),
+ _ => false,
+ }
+ }
+}
+
impl<OffsetSize: BinaryOffsetSizeTrait> JsonEqual for GenericBinaryArray<OffsetSize> {
fn equals_json(&self, json: &[&Value]) -> bool {
if self.len() != json.len() {
diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs
index 69b65f4..bd791f9 100644
--- a/arrow/src/array/mod.rs
+++ b/arrow/src/array/mod.rs
@@ -87,6 +87,7 @@ mod array_binary;
mod array_boolean;
mod array_dictionary;
mod array_list;
+mod array_map;
mod array_primitive;
mod array_string;
mod array_struct;
@@ -122,6 +123,7 @@ pub use self::array_dictionary::DictionaryArray;
pub use self::array_list::FixedSizeListArray;
pub use self::array_list::LargeListArray;
pub use self::array_list::ListArray;
+pub use self::array_map::MapArray;
pub use self::array_primitive::PrimitiveArray;
pub use self::array_string::LargeStringArray;
pub use self::array_string::StringArray;
diff --git a/arrow/src/datatypes/datatype.rs b/arrow/src/datatypes/datatype.rs
index 122cbdd..1cbec34 100644
--- a/arrow/src/datatypes/datatype.rs
+++ b/arrow/src/datatypes/datatype.rs
@@ -129,6 +129,20 @@ pub enum DataType {
Dictionary(Box<DataType>, Box<DataType>),
/// Decimal value with precision and scale
Decimal(usize, usize),
+ /// A Map is a logical nested type that is represented as
+ ///
+ /// `List<entries: Struct<key: K, value: V>>`
+ ///
+ /// The keys and values are each respectively contiguous.
+ /// The key and value types are not constrained, but keys should be
+ /// hashable and unique.
+ /// Whether the keys are sorted can be set in the `bool` after the `Field`.
+ ///
+ /// In a field with Map type, the field has a child Struct field, which then
+ /// has two children: key type and the second the value type. The names of the
+ /// child fields may be respectively "entries", "key", and "value", but this is
+ /// not enforced.
+ Map(Box<Field>, bool),
}
/// An absolute length of time in seconds, milliseconds, microseconds or nanoseconds.
@@ -335,6 +349,16 @@ impl DataType {
// return an empty `struct` type as its children aren't defined in the map
Ok(DataType::Struct(vec![]))
}
+ Some(s) if s == "map" => {
+ if let Some(Value::Bool(keys_sorted)) = map.get("keysSorted") {
+ // Return a map with an empty type as its children aren't defined in the map
+ Ok(DataType::Map(Box::new(default_field), *keys_sorted))
+ } else {
+ Err(ArrowError::ParseError(
+ "Expecting a keysSorted for map".to_string(),
+ ))
+ }
+ }
Some(other) => Err(ArrowError::ParseError(format!(
"invalid or unsupported type name: {} in {:?}",
other, json
@@ -429,6 +453,9 @@ impl DataType {
DataType::Decimal(precision, scale) => {
json!({"name": "decimal", "precision": precision, "scale": scale})
}
+ DataType::Map(_, keys_sorted) => {
+ json!({"name": "map", "keysSorted": keys_sorted})
+ }
}
}
@@ -471,6 +498,10 @@ impl DataType {
&& a.data_type().equals_datatype(b.data_type())
})
}
+ (
+ DataType::Map(a_field, a_is_sorted),
+ DataType::Map(b_field, b_is_sorted),
+ ) => a_field == b_field && a_is_sorted == b_is_sorted,
_ => self == other,
}
}
diff --git a/arrow/src/datatypes/field.rs b/arrow/src/datatypes/field.rs
index 1cb8eb8..497dbb3 100644
--- a/arrow/src/datatypes/field.rs
+++ b/arrow/src/datatypes/field.rs
@@ -271,6 +271,35 @@ impl Field {
));
}
},
+ DataType::Map(_, keys_sorted) => {
+ match map.get("children") {
+ Some(Value::Array(values)) if values.len() == 1 => {
+ let child = Self::from(&values[0])?;
+ // child must be a struct
+ match child.data_type() {
+ DataType::Struct(map_fields) if map_fields.len() == 2 => {
+ DataType::Map(Box::new(child), keys_sorted)
+ }
+ t => {
+ return Err(ArrowError::ParseError(
+ format!("Map children should be a struct with 2 fields, found {:?}", t)
+ ))
+ }
+ }
+ }
+ Some(_) => {
+ return Err(ArrowError::ParseError(
+ "Field 'children' must be an array with 1 element"
+ .to_string(),
+ ))
+ }
+ None => {
+ return Err(ArrowError::ParseError(
+ "Field missing 'children' attribute".to_string(),
+ ));
+ }
+ }
+ }
_ => data_type,
};
@@ -329,6 +358,9 @@ impl Field {
DataType::List(field) => vec![field.to_json()],
DataType::LargeList(field) => vec![field.to_json()],
DataType::FixedSizeList(field, _) => vec![field.to_json()],
+ DataType::Map(field, _) => {
+ vec![field.to_json()]
+ }
_ => vec![],
};
match self.data_type() {
@@ -468,6 +500,7 @@ impl Field {
| DataType::Interval(_)
| DataType::LargeList(_)
| DataType::List(_)
+ | DataType::Map(_, _)
| DataType::Dictionary(_, _)
| DataType::FixedSizeList(_, _)
| DataType::FixedSizeBinary(_)
diff --git a/arrow/src/datatypes/mod.rs b/arrow/src/datatypes/mod.rs
index 5da7126..9920cf9 100644
--- a/arrow/src/datatypes/mod.rs
+++ b/arrow/src/datatypes/mod.rs
@@ -208,6 +208,66 @@ mod tests {
}
#[test]
+ fn map_field_to_json() {
+ let f = Field::new(
+ "my_map",
+ DataType::Map(
+ Box::new(Field::new(
+ "my_entries",
+ DataType::Struct(vec![
+ Field::new("my_keys", DataType::Utf8, false),
+ Field::new("my_values", DataType::UInt16, true),
+ ]),
+ false,
+ )),
+ true,
+ ),
+ false,
+ );
+ let value: Value = serde_json::from_str(
+ r#"{
+ "name": "my_map",
+ "nullable": false,
+ "type": {
+ "name": "map",
+ "keysSorted": true
+ },
+ "children": [
+ {
+ "name": "my_entries",
+ "nullable": false,
+ "type": {
+ "name": "struct"
+ },
+ "children": [
+ {
+ "name": "my_keys",
+ "nullable": false,
+ "type": {
+ "name": "utf8"
+ },
+ "children": []
+ },
+ {
+ "name": "my_values",
+ "nullable": true,
+ "type": {
+ "name": "int",
+ "bitWidth": 16,
+ "isSigned": false
+ },
+ "children": []
+ }
+ ]
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+ assert_eq!(value, f.to_json());
+ }
+
+ #[test]
fn primitive_field_to_json() {
let f = Field::new("first_name", DataType::Utf8, false);
let value: Value = serde_json::from_str(
@@ -270,6 +330,69 @@ mod tests {
}
#[test]
+ fn parse_map_from_json() {
+ let json = r#"
+ {
+ "name": "my_map",
+ "nullable": false,
+ "type": {
+ "name": "map",
+ "keysSorted": true
+ },
+ "children": [
+ {
+ "name": "my_entries",
+ "nullable": false,
+ "type": {
+ "name": "struct"
+ },
+ "children": [
+ {
+ "name": "my_keys",
+ "nullable": false,
+ "type": {
+ "name": "utf8"
+ },
+ "children": []
+ },
+ {
+ "name": "my_values",
+ "nullable": true,
+ "type": {
+ "name": "int",
+ "bitWidth": 16,
+ "isSigned": false
+ },
+ "children": []
+ }
+ ]
+ }
+ ]
+ }
+ "#;
+ let value: Value = serde_json::from_str(json).unwrap();
+ let dt = Field::from(&value).unwrap();
+
+ let expected = Field::new(
+ "my_map",
+ DataType::Map(
+ Box::new(Field::new(
+ "my_entries",
+ DataType::Struct(vec![
+ Field::new("my_keys", DataType::Utf8, false),
+ Field::new("my_values", DataType::UInt16, true),
+ ]),
+ false,
+ )),
+ true,
+ ),
+ false,
+ );
+
+ assert_eq!(expected, dt);
+ }
+
+ #[test]
fn parse_utf8_from_json() {
let json = "{\"name\":\"utf8\"}";
let value: Value = serde_json::from_str(json).unwrap();
@@ -396,6 +519,21 @@ mod tests {
))),
true,
),
+ Field::new(
+ "c35",
+ DataType::Map(
+ Box::new(Field::new(
+ "my_entries",
+ DataType::Struct(vec![
+ Field::new("my_keys", DataType::Utf8, false),
+ Field::new("my_values", DataType::UInt16, true),
+ ]),
+ false,
+ )),
+ true,
+ ),
+ false,
+ ),
],
metadata,
);
@@ -790,6 +928,43 @@ mod tests {
]
}
]
+ },
+ {
+ "name": "c35",
+ "nullable": false,
+ "type": {
+ "name": "map",
+ "keysSorted": true
+ },
+ "children": [
+ {
+ "name": "my_entries",
+ "nullable": false,
+ "type": {
+ "name": "struct"
+ },
+ "children": [
+ {
+ "name": "my_keys",
+ "nullable": false,
+ "type": {
+ "name": "utf8"
+ },
+ "children": []
+ },
+ {
+ "name": "my_values",
+ "nullable": true,
+ "type": {
+ "name": "int",
+ "bitWidth": 16,
+ "isSigned": false
+ },
+ "children": []
+ }
+ ]
+ }
+ ]
}
],
"metadata" : {
diff --git a/arrow/src/ipc/convert.rs b/arrow/src/ipc/convert.rs
index 59d4d0b..5244a38 100644
--- a/arrow/src/ipc/convert.rs
+++ b/arrow/src/ipc/convert.rs
@@ -308,6 +308,14 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
DataType::Struct(fields)
}
+ ipc::Type::Map => {
+ let map = field.type_as_map().unwrap();
+ let children = field.children().unwrap();
+ if children.len() != 1 {
+ panic!("expect a map to have one child")
+ }
+ DataType::Map(Box::new(children.get(0).into()), map.keysSorted())
+ }
ipc::Type::Decimal => {
let fsb = field.type_as_decimal().unwrap();
DataType::Decimal(fsb.precision() as usize, fsb.scale() as usize)
@@ -624,6 +632,16 @@ pub(crate) fn get_fb_field_type<'a>(
children: Some(fbb.create_vector(&children[..])),
}
}
+ Map(map_field, keys_sorted) => {
+ let child = build_field(fbb, map_field);
+ let mut field_type = ipc::MapBuilder::new(fbb);
+ field_type.add_keysSorted(*keys_sorted);
+ FBFieldType {
+ type_type: ipc::Type::Map,
+ type_: field_type.finish().as_union_value(),
+ children: Some(fbb.create_vector(&[child])),
+ }
+ }
Dictionary(_, value_type) => {
// In this library, the dictionary "type" is a logical construct. Here we
// pass through to the value type, as we've already captured the index
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 7bba311..50e858f 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -89,7 +89,7 @@ fn create_array(
buffer_index += 2;
array
}
- List(ref list_field) | LargeList(ref list_field) => {
+ List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
@@ -377,8 +377,19 @@ fn create_list_array(
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
+ } else if let DataType::Map(_, _) = *data_type {
+ let null_count = field_node.null_count() as usize;
+ let mut builder = ArrayData::builder(data_type.clone())
+ .len(field_node.length() as usize)
+ .buffers(buffers[1..2].to_vec())
+ .offset(0)
+ .child_data(vec![child_array.data().clone()]);
+ if null_count > 0 {
+ builder = builder.null_bit_buffer(buffers[0].clone())
+ }
+ make_array(builder.build())
} else {
- panic!("Cannot create list array from {:?}", data_type)
+ panic!("Cannot create list or map array from {:?}", data_type)
}
}
@@ -931,6 +942,7 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
@@ -972,6 +984,7 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
"generated_nested",
"generated_null_trivial",
"generated_null",
@@ -999,6 +1012,7 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
@@ -1033,6 +1047,8 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
+ // "generated_map_non_canonical",
"generated_nested",
"generated_null_trivial",
"generated_null",
@@ -1064,6 +1080,8 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
+ // "generated_map_non_canonical",
"generated_nested",
"generated_null_trivial",
"generated_null",
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index f342d67..0376265 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -928,6 +928,7 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
@@ -979,6 +980,7 @@ mod tests {
"generated_interval",
"generated_datetime",
"generated_dictionary",
+ "generated_map",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
@@ -1031,6 +1033,7 @@ mod tests {
"generated_dictionary",
// "generated_duplicate_fieldnames",
"generated_interval",
+ "generated_map",
"generated_nested",
// "generated_nested_large_offsets",
"generated_null_trivial",
@@ -1094,6 +1097,7 @@ mod tests {
"generated_dictionary",
// "generated_duplicate_fieldnames",
"generated_interval",
+ "generated_map",
"generated_nested",
// "generated_nested_large_offsets",
"generated_null_trivial",
diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs
index 290ad4f..c4e8470 100644
--- a/arrow/src/json/reader.rs
+++ b/arrow/src/json/reader.rs
@@ -47,6 +47,7 @@ use std::sync::Arc;
use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
+use serde_json::json;
use serde_json::{map::Map as JsonMap, Value};
use crate::buffer::MutableBuffer;
@@ -1282,6 +1283,12 @@ impl Decoder {
.build();
Ok(make_array(data))
}
+ DataType::Map(map_field, _) => self.build_map_array(
+ rows,
+ field.name(),
+ field.data_type(),
+ map_field,
+ ),
_ => Err(ArrowError::JsonError(format!(
"{:?} type is not supported",
field.data_type()
@@ -1292,6 +1299,101 @@ impl Decoder {
arrays
}
+ fn build_map_array(
+ &self,
+ rows: &[Value],
+ field_name: &str,
+ map_type: &DataType,
+ struct_field: &Field,
+ ) -> Result<ArrayRef> {
+ // A map has the format {"key": "value"} where key is most commonly a string,
+ // but could be a string, number or boolean (🤷🏾♂️) (e.g. {1: "value"}).
+ // A map is also represented as a flattened contiguous array, with the number
+ // of key-value pairs being separated by a list offset.
+ // If row 1 has 2 key-value pairs, and row 2 has 3, the offsets would be
+ // [0, 2, 5].
+ //
+ // Thus we try to read a map by iterating through the keys and values
+
+ let (key_field, value_field) =
+ if let DataType::Struct(fields) = struct_field.data_type() {
+ if fields.len() != 2 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "DataType::Map expects a struct with 2 fields, found {} fields",
+ fields.len()
+ )));
+ }
+ (&fields[0], &fields[1])
+ } else {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "JSON map array builder expects a DataType::Map, found {:?}",
+ struct_field.data_type()
+ )));
+ };
+ let value_map_iter = rows.iter().map(|value| {
+ value
+ .get(field_name)
+ .map(|v| v.as_object().map(|map| (map, map.len() as i32)))
+ .flatten()
+ });
+ let rows_len = rows.len();
+ let mut list_offsets = Vec::with_capacity(rows_len + 1);
+ list_offsets.push(0i32);
+ let mut last_offset = 0;
+ let num_bytes = bit_util::ceil(rows_len, 8);
+ let mut list_bitmap = MutableBuffer::from_len_zeroed(num_bytes);
+ let null_data = list_bitmap.as_slice_mut();
+
+ let struct_rows = value_map_iter
+ .enumerate()
+ .filter_map(|(i, v)| match v {
+ Some((map, len)) => {
+ list_offsets.push(last_offset + len);
+ last_offset += len;
+ bit_util::set_bit(null_data, i);
+ Some(map.iter().map(|(k, v)| {
+ json!({
+ key_field.name(): k,
+ value_field.name(): v
+ })
+ }))
+ }
+ None => {
+ list_offsets.push(last_offset);
+ None
+ }
+ })
+ .flatten()
+ .collect::<Vec<Value>>();
+
+ let struct_children = self.build_struct_array(
+ struct_rows.as_slice(),
+ &[key_field.clone(), value_field.clone()],
+ &[],
+ )?;
+
+ Ok(make_array(ArrayData::new(
+ map_type.clone(),
+ rows_len,
+ None,
+ Some(list_bitmap.into()),
+ 0,
+ vec![Buffer::from_slice_ref(&list_offsets)],
+ vec![ArrayData::new(
+ struct_field.data_type().clone(),
+ struct_children[0].len(),
+ None,
+ None,
+ 0,
+ vec![],
+ struct_children
+ .into_iter()
+ .map(|array| array.data().clone())
+ .collect(),
+ )],
+ )))
+ }
+
#[inline(always)]
fn build_dictionary_array<T>(
&self,
@@ -2178,6 +2280,81 @@ mod tests {
}
#[test]
+ fn test_map_json_arrays() {
+ let account_field = Field::new("account", DataType::UInt16, false);
+ let value_list_type =
+ DataType::List(Box::new(Field::new("item", DataType::Utf8, false)));
+ let entries_struct_type = DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", value_list_type.clone(), true),
+ ]);
+ let stocks_field = Field::new(
+ "stocks",
+ DataType::Map(
+ Box::new(Field::new("entries", entries_struct_type.clone(), false)),
+ false,
+ ),
+ true,
+ );
+ let schema = Arc::new(Schema::new(vec![account_field, stocks_field.clone()]));
+ let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
+ // Note: account 456 has 'long' twice, to show that the JSON reader will overwrite
+ // existing keys. This thus guarantees unique keys for the map
+ let json_content = r#"
+ {"account": 123, "stocks":{"long": ["$AAA", "$BBB"], "short": ["$CCC", "$D"]}}
+ {"account": 456, "stocks":{"long": null, "long": ["$AAA", "$CCC", "$D"], "short": null}}
+ {"account": 789, "stocks":{"hedged": ["$YYY"], "long": null, "short": ["$D"]}}
+ "#;
+ let mut reader = builder.build(Cursor::new(json_content)).unwrap();
+
+ // build expected output
+ let expected_accounts = UInt16Array::from(vec![123, 456, 789]);
+
+ let expected_keys = StringArray::from(vec![
+ "long", "short", "long", "short", "hedged", "long", "short",
+ ])
+ .data()
+ .clone();
+ let expected_value_array_data = StringArray::from(vec![
+ "$AAA", "$BBB", "$CCC", "$D", "$AAA", "$CCC", "$D", "$YYY", "$D",
+ ])
+ .data()
+ .clone();
+ // Create the list that holds ["$_", "$_"]
+ let expected_values = ArrayDataBuilder::new(value_list_type)
+ .len(7)
+ .add_buffer(Buffer::from(
+ vec![0i32, 2, 4, 7, 7, 8, 8, 9].to_byte_slice(),
+ ))
+ .add_child_data(expected_value_array_data)
+ .null_bit_buffer(Buffer::from(vec![0b01010111]))
+ .build();
+ let expected_stocks_entries_data = ArrayDataBuilder::new(entries_struct_type)
+ .len(7)
+ .add_child_data(expected_keys)
+ .add_child_data(expected_values)
+ .build();
+ let expected_stocks_data =
+ ArrayDataBuilder::new(stocks_field.data_type().clone())
+ .len(3)
+ .add_buffer(Buffer::from(vec![0i32, 2, 4, 7].to_byte_slice()))
+ .add_child_data(expected_stocks_entries_data)
+ .build();
+
+ let expected_stocks = make_array(expected_stocks_data);
+
+ // compare with result from json reader
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 3);
+ assert_eq!(batch.num_columns(), 2);
+ let col1 = batch.column(0);
+ assert_eq!(col1.data(), expected_accounts.data());
+ // Compare the map
+ let col2 = batch.column(1);
+ assert_eq!(col2.data(), expected_stocks.data());
+ }
+
+ #[test]
fn test_dictionary_from_json_basic_with_nulls() {
let schema = Schema::new(vec![Field::new(
"d",
diff --git a/arrow/src/util/integration_util.rs b/arrow/src/util/integration_util.rs
index bac0e47..ada2494 100644
--- a/arrow/src/util/integration_util.rs
+++ b/arrow/src/util/integration_util.rs
@@ -350,6 +350,10 @@ impl ArrowJsonBatch {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
}
+ DataType::Map(_, _) => {
+ let arr = arr.as_any().downcast_ref::<MapArray>().unwrap();
+ arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
+ }
DataType::Decimal(_, _) => {
let arr = arr.as_any().downcast_ref::<DecimalArray>().unwrap();
arr.equals_json(&json_array.iter().collect::<Vec<&Value>>()[..])
@@ -492,6 +496,7 @@ fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec<Value> {
json_from_fixed_size_list_col(col, field.data_type(), *list_size as usize)
}
DataType::Struct(fields) => json_from_struct_col(col, fields),
+ DataType::Map(field, keys_sorted) => json_from_map_col(col, field, *keys_sorted),
DataType::Int64
| DataType::UInt64
| DataType::Date64
@@ -649,6 +654,51 @@ fn json_from_fixed_size_list_col(
values
}
+fn json_from_map_col(
+ col: &ArrowJsonColumn,
+ field: &Field,
+ _keys_sorted: bool,
+) -> Vec<Value> {
+ let mut values = Vec::with_capacity(col.count);
+
+ // get the inner array
+ let child = &col.children.clone().expect("list type must have children")[0];
+ let offsets: Vec<usize> = col
+ .offset
+ .clone()
+ .unwrap()
+ .iter()
+ .map(|o| match o {
+ Value::String(s) => s.parse::<usize>().unwrap(),
+ Value::Number(n) => n.as_u64().unwrap() as usize,
+ _ => panic!(
+ "Offsets should be numbers or strings that are convertible to numbers"
+ ),
+ })
+ .collect();
+
+ let inner = match field.data_type() {
+ DataType::Struct(fields) => json_from_struct_col(child, fields),
+ _ => panic!("Map child must be Struct"),
+ };
+
+ for i in 0..col.count {
+ match &col.validity {
+ Some(validity) => match &validity[i] {
+ 0 => values.push(Value::Null),
+ 1 => {
+ values.push(Value::Array(inner[offsets[i]..offsets[i + 1]].to_vec()))
+ }
+ _ => panic!("Validity data should be 0 or 1"),
+ },
+ None => {
+ // Null type does not have a validity vector
+ }
+ }
+ }
+
+ values
+}
#[cfg(test)]
mod tests {
use super::*;
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f8cad6f..d3259c4 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -27,7 +27,7 @@ use arrow::array::{
new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray,
BinaryBuilder, BooleanArray, BooleanBufferBuilder, BooleanBuilder, DecimalBuilder,
FixedSizeBinaryArray, FixedSizeBinaryBuilder, GenericListArray, Int16BufferBuilder,
- Int32Array, Int64Array, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
+ Int32Array, Int64Array, MapArray, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder,
StringArray, StringBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
@@ -924,6 +924,145 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
}
}
+/// Implementation of a map array reader.
+pub struct MapArrayReader {
+ key_reader: Box<dyn ArrayReader>,
+ value_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ map_def_level: i16,
+ map_rep_level: i16,
+ def_level_buffer: Option<Buffer>,
+ rep_level_buffer: Option<Buffer>,
+}
+
+impl MapArrayReader {
+ pub fn new(
+ key_reader: Box<dyn ArrayReader>,
+ value_reader: Box<dyn ArrayReader>,
+ data_type: ArrowType,
+ def_level: i16,
+ rep_level: i16,
+ ) -> Self {
+ Self {
+ key_reader,
+ value_reader,
+ data_type,
+ map_def_level: rep_level,
+ map_rep_level: def_level,
+ def_level_buffer: None,
+ rep_level_buffer: None,
+ }
+ }
+}
+
+impl ArrayReader for MapArrayReader {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ let key_array = self.key_reader.next_batch(batch_size)?;
+ let value_array = self.value_reader.next_batch(batch_size)?;
+
+ // Check that key and value have the same lengths
+ let key_length = key_array.len();
+ if key_length != value_array.len() {
+ return Err(general_err!(
+ "Map key and value should have the same lengths."
+ ));
+ }
+
+ let def_levels = self
+ .key_reader
+ .get_def_levels()
+ .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?;
+ let rep_levels = self
+ .key_reader
+ .get_rep_levels()
+ .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?;
+
+ if !((def_levels.len() == rep_levels.len()) && (rep_levels.len() == key_length)) {
+ return Err(ArrowError(
+ "Expected item_reader def_levels and rep_levels to be same length as batch".to_string(),
+ ));
+ }
+
+ let entry_data_type = if let ArrowType::Map(field, _) = &self.data_type {
+ field.data_type().clone()
+ } else {
+ return Err(ArrowError("Expected a map arrow type".to_string()));
+ };
+
+ let entry_data = ArrayDataBuilder::new(entry_data_type)
+ .len(key_length)
+ .add_child_data(key_array.data().clone())
+ .add_child_data(value_array.data().clone())
+ .build();
+
+ let entry_len = rep_levels.iter().filter(|level| **level == 0).count();
+
+ // first item in each list has rep_level = 0, subsequent items have rep_level = 1
+ let mut offsets: Vec<i32> = Vec::new();
+ let mut cur_offset = 0;
+ def_levels.iter().zip(rep_levels).for_each(|(d, r)| {
+ if *r == 0 || d == &self.map_def_level {
+ offsets.push(cur_offset);
+ }
+ if d > &self.map_def_level {
+ cur_offset += 1;
+ }
+ });
+ offsets.push(cur_offset);
+
+ let num_bytes = bit_util::ceil(offsets.len(), 8);
+ // TODO: A useful optimization is to use the null count to fill with
+ // 0 or null, to reduce individual bits set in a loop.
+ // To favour dense data, set every slot to true, then unset
+ let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
+ let null_slice = null_buf.as_slice_mut();
+ let mut list_index = 0;
+ for i in 0..rep_levels.len() {
+ // If the level is lower than empty, then the slot is null.
+ // When a list is non-nullable, its empty level = null level,
+ // so this automatically factors that in.
+ if rep_levels[i] == 0 && def_levels[i] < self.map_def_level {
+ // should be empty list
+ bit_util::unset_bit(null_slice, list_index);
+ }
+ if rep_levels[i] == 0 {
+ list_index += 1;
+ }
+ }
+ let value_offsets = Buffer::from(&offsets.to_byte_slice());
+
+ // Now we can build array data
+ let array_data = ArrayDataBuilder::new(self.data_type.clone())
+ .len(entry_len)
+ .add_buffer(value_offsets)
+ .null_bit_buffer(null_buf.into())
+ .add_child_data(entry_data)
+ .build();
+
+ Ok(Arc::new(MapArray::from(array_data)))
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.def_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.rep_level_buffer
+ .as_ref()
+ .map(|buf| unsafe { buf.typed_data() })
+ }
+}
+
/// Implementation of struct array reader.
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
@@ -1176,8 +1315,6 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
for ArrayReaderBuilder
{
/// Build array reader for primitive type.
- /// Currently we don't have a list reader implementation, so repeated type is not
- /// supported yet.
fn visit_primitive(
&mut self,
cur_type: TypePtr,
@@ -1251,15 +1388,87 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
}
/// Build array reader for map type.
- /// Currently this is not supported.
fn visit_map(
&mut self,
- _cur_type: Arc<Type>,
- _context: &'a ArrayReaderBuilderContext,
+ map_type: Arc<Type>,
+ context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
- Err(ArrowError(
- "Reading parquet map array into arrow is not supported yet!".to_string(),
- ))
+ // Add map type to context
+ let mut new_context = context.clone();
+ new_context.path.append(vec![map_type.name().to_string()]);
+ if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
+ new_context.def_level += 1;
+ }
+
+ // Add map entry (key_value) to context
+ let map_key_value = map_type.get_fields().first().ok_or_else(|| {
+ ArrowError("Map field must have a key_value entry".to_string())
+ })?;
+ new_context
+ .path
+ .append(vec![map_key_value.name().to_string()]);
+ new_context.rep_level += 1;
+
+ // Get key and value, and create context for each
+ let map_key = map_key_value
+ .get_fields()
+ .first()
+ .ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?;
+ let map_value = map_key_value
+ .get_fields()
+ .get(1)
+ .ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?;
+
+ let key_reader = {
+ let mut key_context = new_context.clone();
+ key_context.def_level += 1;
+ key_context.path.append(vec![map_key.name().to_string()]);
+ self.dispatch(map_key.clone(), &key_context)?.unwrap()
+ };
+ let value_reader = {
+ let mut value_context = new_context.clone();
+ if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() {
+ value_context.def_level += 1;
+ }
+ self.dispatch(map_value.clone(), &value_context)?.unwrap()
+ };
+
+ let arrow_type = self
+ .arrow_schema
+ .field_with_name(map_type.name())
+ .ok()
+ .map(|f| f.data_type().to_owned())
+ .unwrap_or_else(|| {
+ ArrowType::Map(
+ Box::new(Field::new(
+ map_key_value.name(),
+ ArrowType::Struct(vec![
+ Field::new(
+ map_key.name(),
+ key_reader.get_data_type().clone(),
+ false,
+ ),
+ Field::new(
+ map_value.name(),
+ value_reader.get_data_type().clone(),
+ map_value.is_optional(),
+ ),
+ ]),
+ map_type.is_optional(),
+ )),
+ false,
+ )
+ });
+
+ let key_array_reader: Box<dyn ArrayReader> = Box::new(MapArrayReader::new(
+ key_reader,
+ value_reader,
+ arrow_type,
+ new_context.def_level,
+ new_context.rep_level,
+ ));
+
+ Ok(Some(key_array_reader))
}
/// Build array reader for list type.
@@ -1269,10 +1478,11 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
item_type: Arc<Type>,
context: &'a ArrayReaderBuilderContext,
) -> Result<Option<Box<dyn ArrayReader>>> {
- let list_child = &list_type
+ let mut list_child = &list_type
.get_fields()
.first()
- .ok_or_else(|| ArrowError("List field must have a child.".to_string()))?;
+ .ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
+ .clone();
let mut new_context = context.clone();
new_context.path.append(vec![list_type.name().to_string()]);
@@ -1316,9 +1526,6 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
_ => {
// a list is a group type with a single child. The list child's
// name comes from the child's field name.
- let mut list_child = list_type.get_fields().first().ok_or(ArrowError(
- "List GroupType should have a field".to_string(),
- ))?;
// if the child's name is "list" and it has a child, then use this child
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
list_child = list_child.get_fields().first().unwrap();
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 83fb0a2..761c5a6 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -668,4 +668,20 @@ mod tests {
batch.unwrap();
}
}
+
+ #[test]
+ fn test_read_maps() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/nested_maps.snappy.parquet", testdata);
+ let parquet_file_reader =
+ SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
+ let record_batch_reader = arrow_reader
+ .get_record_reader(60)
+ .expect("Failed to read into array!");
+
+ for batch in record_batch_reader {
+ batch.unwrap();
+ }
+ }
}
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs
index 3ff1304..4726734 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer.rs
@@ -199,6 +199,15 @@ fn write_leaves(
}
Ok(())
}
+ ArrowDataType::Map(_, _) => {
+ let map_array: &arrow_array::MapArray = array
+ .as_any()
+ .downcast_ref::<arrow_array::MapArray>()
+ .expect("Unable to get map array");
+ write_leaves(&mut row_group_writer, &map_array.keys(), &mut levels)?;
+ write_leaves(&mut row_group_writer, &map_array.values(), &mut levels)?;
+ Ok(())
+ }
ArrowDataType::Dictionary(_, value_type) => {
// cast dictionary to a primitive
let array = arrow::compute::cast(array, value_type)?;
@@ -936,6 +945,36 @@ mod tests {
}
#[test]
+ fn arrow_writer_map() {
+ // Note: we are using the JSON Arrow reader for brevity
+ let json_content = r#"
+ {"stocks":{"long": "$AAA", "short": "$BBB"}}
+ {"stocks":{"long": null, "long": "$CCC", "short": null}}
+ {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
+ "#;
+ let entries_struct_type = DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Utf8, true),
+ ]);
+ let stocks_field = Field::new(
+ "stocks",
+ DataType::Map(
+ Box::new(Field::new("entries", entries_struct_type, false)),
+ false,
+ ),
+ true,
+ );
+ let schema = Arc::new(Schema::new(vec![stocks_field]));
+ let builder = arrow::json::ReaderBuilder::new()
+ .with_schema(schema)
+ .with_batch_size(64);
+ let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+ roundtrip("test_arrow_writer_map.parquet", batch, None);
+ }
+
+ #[test]
fn arrow_writer_2_level_struct() {
// tests writing <struct<struct<primitive>>
let field_c = Field::new("c", DataType::Int32, true);
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/levels.rs
index 0af0f9e..3be315b 100644
--- a/parquet/src/arrow/levels.rs
+++ b/parquet/src/arrow/levels.rs
@@ -40,7 +40,7 @@
//!
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
-use arrow::array::{make_array, ArrayRef, StructArray};
+use arrow::array::{make_array, ArrayRef, MapArray, StructArray};
use arrow::datatypes::{DataType, Field};
/// Keeps track of the level information per array that is needed to write an Arrow array to Parquet.
@@ -234,13 +234,53 @@ impl LevelInfo {
LevelType::Primitive(list_field.is_nullable()),
)]
}
- DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
+ DataType::List(_)
+ | DataType::LargeList(_)
+ | DataType::Struct(_)
+ | DataType::Map(_, _) => {
list_level.calculate_array_levels(&child_array, list_field)
}
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Union(_) => unimplemented!(),
}
}
+ DataType::Map(map_field, _) => {
+ // Calculate the map level
+ let map_level = self.calculate_child_levels(
+ array_offsets,
+ array_mask,
+ // A map is treated like a list as it has repetition
+ LevelType::List(field.is_nullable()),
+ );
+
+ let map_array = array.as_any().downcast_ref::<MapArray>().unwrap();
+
+ let key_array = map_array.keys();
+ let value_array = map_array.values();
+
+ if let DataType::Struct(fields) = map_field.data_type() {
+ let key_field = &fields[0];
+ let value_field = &fields[1];
+
+ let mut map_levels = vec![];
+
+ // Get key levels
+ let mut key_levels =
+ map_level.calculate_array_levels(&key_array, key_field);
+ map_levels.append(&mut key_levels);
+
+ let mut value_levels =
+ map_level.calculate_array_levels(&value_array, value_field);
+ map_levels.append(&mut value_levels);
+
+ map_levels
+ } else {
+ panic!(
+ "Map field should be a struct, found {:?}",
+ map_field.data_type()
+ );
+ }
+ }
DataType::FixedSizeList(_, _) => unimplemented!(),
DataType::Struct(struct_fields) => {
let struct_array: &StructArray = array
@@ -663,7 +703,7 @@ impl LevelInfo {
};
((0..=(len as i64)).collect(), array_mask)
}
- DataType::List(_) => {
+ DataType::List(_) | DataType::Map(_, _) => {
let data = array.data();
let offsets = unsafe { data.buffers()[0].typed_data::<i32>() };
let offsets = offsets
@@ -1547,4 +1587,90 @@ mod tests {
panic!("Levels should not be equal, to reflect the difference in struct nullness");
}
}
+
+ #[test]
+ fn test_map_array() {
+ // Note: we are using the JSON Arrow reader for brevity
+ let json_content = r#"
+ {"stocks":{"long": "$AAA", "short": "$BBB"}}
+ {"stocks":{"long": null, "long": "$CCC", "short": null}}
+ {"stocks":{"hedged": "$YYY", "long": null, "short": "$D"}}
+ "#;
+ let entries_struct_type = DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Utf8, true),
+ ]);
+ let stocks_field = Field::new(
+ "stocks",
+ DataType::Map(
+ Box::new(Field::new("entries", entries_struct_type, false)),
+ false,
+ ),
+ // not nullable, so the keys have max level = 1
+ false,
+ );
+ let schema = Arc::new(Schema::new(vec![stocks_field]));
+ let builder = arrow::json::ReaderBuilder::new()
+ .with_schema(schema)
+ .with_batch_size(64);
+ let mut reader = builder.build(std::io::Cursor::new(json_content)).unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+
+ let expected_batch_level = LevelInfo {
+ definition: vec![0; 3],
+ repetition: None,
+ array_offsets: (0..=3).collect(),
+ array_mask: vec![true, true, true],
+ max_definition: 0,
+ level_type: LevelType::Root,
+ offset: 0,
+ length: 3,
+ };
+
+ let batch_level = LevelInfo::new(0, 3);
+ assert_eq!(&batch_level, &expected_batch_level);
+
+ // calculate the map's level
+ let mut levels = vec![];
+ batch
+ .columns()
+ .iter()
+ .zip(batch.schema().fields())
+ .for_each(|(array, field)| {
+ let mut array_levels = batch_level.calculate_array_levels(array, field);
+ levels.append(&mut array_levels);
+ });
+ assert_eq!(levels.len(), 2);
+
+ // test key levels
+ let list_level = levels.get(0).unwrap();
+
+ let expected_level = LevelInfo {
+ definition: vec![1; 7],
+ repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ array_offsets: vec![0, 2, 4, 7],
+ array_mask: vec![true; 7],
+ max_definition: 1,
+ level_type: LevelType::Primitive(false),
+ offset: 0,
+ length: 7,
+ };
+ assert_eq!(list_level, &expected_level);
+
+ // test values levels
+ let list_level = levels.get(1).unwrap();
+
+ let expected_level = LevelInfo {
+ definition: vec![2, 2, 2, 1, 2, 1, 2],
+ repetition: Some(vec![0, 1, 0, 1, 0, 1, 1]),
+ array_offsets: vec![0, 2, 4, 7],
+ array_mask: vec![true, true, true, false, true, false, true],
+ max_definition: 2,
+ level_type: LevelType::Primitive(true),
+ offset: 0,
+ length: 7,
+ };
+ assert_eq!(list_level, &expected_level);
+ }
}
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index 18dacd1..5fe94ce 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -507,6 +507,35 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
.with_repetition(repetition)
.build()
}
+ DataType::Map(field, _) => {
+ if let DataType::Struct(struct_fields) = field.data_type() {
+ Type::group_type_builder(name)
+ .with_fields(&mut vec![Arc::new(
+ Type::group_type_builder(field.name())
+ .with_fields(&mut vec![
+ Arc::new(arrow_to_parquet_type(&Field::new(
+ struct_fields[0].name(),
+ struct_fields[0].data_type().clone(),
+ false,
+ ))?),
+ Arc::new(arrow_to_parquet_type(&Field::new(
+ struct_fields[1].name(),
+ struct_fields[1].data_type().clone(),
+ struct_fields[1].is_nullable(),
+ ))?),
+ ])
+ .with_repetition(Repetition::REPEATED)
+ .build()?,
+ )])
+ .with_logical_type(Some(LogicalType::MAP(Default::default())))
+ .with_repetition(repetition)
+ .build()
+ } else {
+ Err(ArrowError(
+ "DataType::Map should contain a struct field child".to_string(),
+ ))
+ }
+ }
DataType::Union(_) => unimplemented!("See ARROW-8817."),
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
@@ -791,24 +820,28 @@ impl ParquetTypeConverter<'_> {
///
/// This function takes care of logical type and repetition.
fn to_group_type(&self) -> Result<Option<DataType>> {
- if self.is_repeated() {
- self.to_struct().map(|opt| {
- opt.map(|dt| {
- DataType::List(Box::new(Field::new(
- self.schema.name(),
- dt,
- self.is_nullable(),
- )))
- })
- })
- } else {
- match (
- self.schema.get_basic_info().logical_type(),
- self.schema.get_basic_info().converted_type(),
- ) {
- (Some(LogicalType::LIST(_)), _) => self.to_list(),
- (None, ConvertedType::LIST) => self.to_list(),
- _ => self.to_struct(),
+ match (
+ self.schema.get_basic_info().logical_type(),
+ self.schema.get_basic_info().converted_type(),
+ ) {
+ (Some(LogicalType::LIST(_)), _) | (_, ConvertedType::LIST) => self.to_list(),
+ (Some(LogicalType::MAP(_)), _)
+ | (_, ConvertedType::MAP)
+ | (_, ConvertedType::MAP_KEY_VALUE) => self.to_map(),
+ (_, _) => {
+ if self.is_repeated() {
+ self.to_struct().map(|opt| {
+ opt.map(|dt| {
+ DataType::List(Box::new(Field::new(
+ self.schema.name(),
+ dt,
+ self.is_nullable(),
+ )))
+ })
+ })
+ } else {
+ self.to_struct()
+ }
}
}
}
@@ -916,6 +949,87 @@ impl ParquetTypeConverter<'_> {
)),
}
}
+
+ /// Converts a parquet map to arrow map.
+ ///
+ /// To fully understand this algorithm, please refer to
+ /// [parquet doc](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md).
+ fn to_map(&self) -> Result<Option<DataType>> {
+ match self.schema {
+ Type::PrimitiveType { .. } => Err(ParquetError::General(format!(
+ "{:?} is a map type and can't be processed as primitive.",
+ self.schema
+ ))),
+ Type::GroupType {
+ basic_info: _,
+ fields,
+ } if fields.len() == 1 => {
+ let key_item = fields.first().unwrap();
+
+ let (key_type, value_type) = match key_item.as_ref() {
+ Type::PrimitiveType { .. } => {
+ return Err(ArrowError(
+ "A map can only have a group child type (key_values)."
+ .to_string(),
+ ))
+ }
+ Type::GroupType {
+ basic_info: _,
+ fields,
+ } => {
+ if fields.len() != 2 {
+ return Err(ArrowError(format!("Map type should have 2 fields, a key and value. Found {} fields", fields.len())));
+ } else {
+ let nested_key = fields.first().unwrap();
+ let nested_key_converter = self.clone_with_schema(nested_key);
+
+ let nested_value = fields.last().unwrap();
+ let nested_value_converter =
+ self.clone_with_schema(nested_value);
+
+ (
+ nested_key_converter.to_data_type()?.map(|d| {
+ Field::new(
+ nested_key.name(),
+ d,
+ nested_key.is_optional(),
+ )
+ }),
+ nested_value_converter.to_data_type()?.map(|d| {
+ Field::new(
+ nested_value.name(),
+ d,
+ nested_value.is_optional(),
+ )
+ }),
+ )
+ }
+ }
+ };
+
+ match (key_type, value_type) {
+ (Some(key), Some(value)) => Ok(Some(DataType::Map(
+ Box::new(Field::new(
+ key_item.name(),
+ DataType::Struct(vec![key, value]),
+ false,
+ )),
+ false, // There is no information to tell if keys are sorted
+ ))),
+ (None, None) => Ok(None),
+ (None, Some(_)) => Err(ArrowError(
+ "Could not convert the map key to a valid datatype".to_string(),
+ )),
+ (Some(_), None) => Err(ArrowError(
+ "Could not convert the map value to a valid datatype".to_string(),
+ )),
+ }
+ }
+ _ => Err(ArrowError(
+ "Group element type of map can only contain one field.".to_string(),
+ )),
+ }
+ }
}
#[cfg(test)]
@@ -1312,6 +1426,122 @@ mod tests {
}
#[test]
+ fn test_parquet_maps() {
+ let mut arrow_fields = Vec::new();
+
+ // LIST encoding example taken from parquet-format/LogicalTypes.md
+ let message_type = "
+ message test_schema {
+ REQUIRED group my_map1 (MAP) {
+ REPEATED group key_value {
+ REQUIRED binary key (UTF8);
+ OPTIONAL int32 value;
+ }
+ }
+ OPTIONAL group my_map2 (MAP) {
+ REPEATED group map {
+ REQUIRED binary str (UTF8);
+ REQUIRED int32 num;
+ }
+ }
+ OPTIONAL group my_map3 (MAP_KEY_VALUE) {
+ REPEATED group map {
+ REQUIRED binary key (UTF8);
+ OPTIONAL int32 value;
+ }
+ }
+ }
+ ";
+
+ // // Map<String, Integer>
+ // required group my_map (MAP) {
+ // repeated group key_value {
+ // required binary key (UTF8);
+ // optional int32 value;
+ // }
+ // }
+ {
+ arrow_fields.push(Field::new(
+ "my_map1",
+ DataType::Map(
+ Box::new(Field::new(
+ "key_value",
+ DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Int32, true),
+ ]),
+ false,
+ )),
+ false,
+ ),
+ false,
+ ));
+ }
+
+ // // Map<String, Integer> (nullable map, non-null values)
+ // optional group my_map (MAP) {
+ // repeated group map {
+ // required binary str (UTF8);
+ // required int32 num;
+ // }
+ // }
+ {
+ arrow_fields.push(Field::new(
+ "my_map2",
+ DataType::Map(
+ Box::new(Field::new(
+ "map",
+ DataType::Struct(vec![
+ Field::new("str", DataType::Utf8, false),
+ Field::new("num", DataType::Int32, false),
+ ]),
+ false,
+ )),
+ false,
+ ),
+ true,
+ ));
+ }
+
+ // // Map<String, Integer> (nullable map, nullable values)
+ // optional group my_map (MAP_KEY_VALUE) {
+ // repeated group map {
+ // required binary key (UTF8);
+ // optional int32 value;
+ // }
+ // }
+ {
+ arrow_fields.push(Field::new(
+ "my_map3",
+ DataType::Map(
+ Box::new(Field::new(
+ "map",
+ DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new("value", DataType::Int32, true),
+ ]),
+ false,
+ )),
+ false,
+ ),
+ true,
+ ));
+ }
+
+ let parquet_group_type = parse_message_type(message_type).unwrap();
+
+ let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
+ let converted_arrow_schema =
+ parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ let converted_fields = converted_arrow_schema.fields();
+
+ assert_eq!(arrow_fields.len(), converted_fields.len());
+ for i in 0..arrow_fields.len() {
+ assert_eq!(arrow_fields[i], converted_fields[i]);
+ }
+ }
+
+ #[test]
fn test_nested_schema() {
let mut arrow_fields = Vec::new();
{
@@ -1843,6 +2073,52 @@ mod tests {
Field::new("c36", DataType::Decimal(2, 1), false),
Field::new("c37", DataType::Decimal(50, 20), false),
Field::new("c38", DataType::Decimal(18, 12), true),
+ Field::new(
+ "c39",
+ DataType::Map(
+ Box::new(Field::new(
+ "key_value",
+ DataType::Struct(vec![
+ Field::new("key", DataType::Utf8, false),
+ Field::new(
+ "value",
+ DataType::List(Box::new(Field::new(
+ "element",
+ DataType::Utf8,
+ true,
+ ))),
+ true,
+ ),
+ ]),
+ false,
+ )),
+ false, // fails to roundtrip keys_sorted
+ ),
+ true,
+ ),
+ Field::new(
+ "c40",
+ DataType::Map(
+ Box::new(Field::new(
+ "my_entries",
+ DataType::Struct(vec![
+ Field::new("my_key", DataType::Utf8, false),
+ Field::new(
+ "my_value",
+ DataType::List(Box::new(Field::new(
+ "item",
+ DataType::Utf8,
+ true,
+ ))),
+ true,
+ ),
+ ]),
+ false,
+ )),
+ false, // fails to roundtrip keys_sorted
+ ),
+ true,
+ ),
],
metadata,
);