You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/05/22 18:13:07 UTC

[GitHub] [arrow] sunchao commented on a change in pull request #6770: ARROW-7842: [Rust] [Parquet] implement array_reader for list type columns

sunchao commented on a change in pull request #6770:
URL: https://github.com/apache/arrow/pull/6770#discussion_r429388420



##########
File path: rust/datafusion/src/logicalplan.rs
##########
@@ -1004,8 +1004,8 @@ mod tests {
         .build()?;
 
         let expected = "Projection: #state, #total_salary\
-        \n  Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\
-        \n    TableScan: employee.csv projection=Some([3, 4])";
+             \n  Aggregate: groupBy=[[#state]], aggr=[[SUM(#salary) AS total_salary]]\

Review comment:
       Is this change related? same below

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => build_empty_list_array_with_primitive_items!(ArrowUInt8Type),
+        ArrowType::UInt16 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt16Type)
+        }
+        ArrowType::UInt32 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt32Type)
+        }
+        ArrowType::UInt64 => {
+            build_empty_list_array_with_primitive_items!(ArrowUInt64Type)
+        }
+        ArrowType::Int8 => build_empty_list_array_with_primitive_items!(ArrowInt8Type),
+        ArrowType::Int16 => build_empty_list_array_with_primitive_items!(ArrowInt16Type),
+        ArrowType::Int32 => build_empty_list_array_with_primitive_items!(ArrowInt32Type),
+        ArrowType::Int64 => build_empty_list_array_with_primitive_items!(ArrowInt64Type),
+        ArrowType::Float32 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat32Type)
+        }
+        ArrowType::Float64 => {
+            build_empty_list_array_with_primitive_items!(ArrowFloat64Type)
+        }
+        ArrowType::Boolean => {
+            build_empty_list_array_with_primitive_items!(ArrowBooleanType)
+        }
+        ArrowType::Date32(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate32Type)
+        }
+        ArrowType::Date64(_) => {
+            build_empty_list_array_with_primitive_items!(ArrowDate64Type)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32SecondType)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime32MillisecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime64MicrosecondType)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowTime64NanosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            build_empty_list_array_with_primitive_items!(ArrowDurationSecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowDurationMillisecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowDurationMicrosecondType)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            build_empty_list_array_with_primitive_items!(ArrowDurationNanosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            build_empty_list_array_with_primitive_items!(ArrowTimestampSecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            build_empty_list_array_with_primitive_items!(ArrowTimestampMillisecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            build_empty_list_array_with_primitive_items!(ArrowTimestampMicrosecondType)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            build_empty_list_array_with_primitive_items!(ArrowTimestampNanosecondType)
+        }
+        ArrowType::Utf8 => {
+            build_empty_list_array_with_non_primitive_items!(StringBuilder)
+        }
+        ArrowType::Binary => {
+            build_empty_list_array_with_non_primitive_items!(BinaryBuilder)
+        }
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+macro_rules! remove_primitive_array_indices {
+    ($arr: expr, $item_type:ty, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<PrimitiveArray<$item_type>>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = PrimitiveBuilder::<$item_type>::new($arr.len());
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_array_indices_custom_builder {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = $item_builder::new(array_data.len());
+
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+macro_rules! remove_fixed_size_binary_array_indices {
+    ($arr: expr, $array_type:ty, $item_builder:ident, $indices:expr, $len:expr) => {{
+        let array_data = match $arr.as_any().downcast_ref::<$array_type>() {
+            Some(a) => a,
+            _ => return Err(ParquetError::General(format!("Error generating next batch for ListArray: {:?} cannot be downcast to PrimitiveArray", $arr))),
+        };
+        let mut builder = FixedSizeBinaryBuilder::new(array_data.len(), $len);
+        for i in 0..array_data.len() {
+            if !$indices.contains(&i) {
+                if array_data.is_null(i) {
+                    builder.append_null()?;
+                } else {
+                    builder.append_value(array_data.value(i))?;
+                }
+            }
+        }
+        Ok(Arc::new(builder.finish()))
+    }};
+}
+
+fn remove_indices(
+    arr: ArrayRef,
+    item_type: ArrowType,
+    indices: Vec<usize>,
+) -> Result<ArrayRef> {
+    match item_type {
+        ArrowType::UInt8 => remove_primitive_array_indices!(arr, ArrowUInt8Type, indices),
+        ArrowType::UInt16 => {
+            remove_primitive_array_indices!(arr, ArrowUInt16Type, indices)
+        }
+        ArrowType::UInt32 => {
+            remove_primitive_array_indices!(arr, ArrowUInt32Type, indices)
+        }
+        ArrowType::UInt64 => {
+            remove_primitive_array_indices!(arr, ArrowUInt64Type, indices)
+        }
+        ArrowType::Int8 => remove_primitive_array_indices!(arr, ArrowInt8Type, indices),
+        ArrowType::Int16 => remove_primitive_array_indices!(arr, ArrowInt16Type, indices),
+        ArrowType::Int32 => remove_primitive_array_indices!(arr, ArrowInt32Type, indices),
+        ArrowType::Int64 => remove_primitive_array_indices!(arr, ArrowInt64Type, indices),
+        ArrowType::Float32 => {
+            remove_primitive_array_indices!(arr, ArrowFloat32Type, indices)
+        }
+        ArrowType::Float64 => {
+            remove_primitive_array_indices!(arr, ArrowFloat64Type, indices)
+        }
+        ArrowType::Boolean => {
+            remove_primitive_array_indices!(arr, ArrowBooleanType, indices)
+        }
+        ArrowType::Date32(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate32Type, indices)
+        }
+        ArrowType::Date64(_) => {
+            remove_primitive_array_indices!(arr, ArrowDate64Type, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowTime32SecondType, indices)
+        }
+        ArrowType::Time32(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime32MillisecondType, indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64MicrosecondType, indices)
+        }
+        ArrowType::Time64(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowTime64NanosecondType, indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Second) => {
+            remove_primitive_array_indices!(arr, ArrowDurationSecondType, indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Millisecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMillisecondType, indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Microsecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationMicrosecondType, indices)
+        }
+        ArrowType::Duration(ArrowTimeUnit::Nanosecond) => {
+            remove_primitive_array_indices!(arr, ArrowDurationNanosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Second, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampSecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Millisecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampMillisecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Microsecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampMicrosecondType, indices)
+        }
+        ArrowType::Timestamp(ArrowTimeUnit::Nanosecond, _) => {
+            remove_primitive_array_indices!(arr, ArrowTimestampNanosecondType, indices)
+        }
+        ArrowType::Utf8 => {
+            remove_array_indices_custom_builder!(arr, StringArray, StringBuilder, indices)
+        }
+        ArrowType::Binary => {
+            remove_array_indices_custom_builder!(arr, BinaryArray, BinaryBuilder, indices)
+        }
+        ArrowType::FixedSizeBinary(size) => remove_fixed_size_binary_array_indices!(
+            arr,
+            FixedSizeBinaryArray,
+            FixedSizeBinaryBuilder,
+            indices,
+            size
+        ),
+        _ => Err(ParquetError::General(format!(
+            "ListArray of type List({:?}) is not supported by array_reader",
+            item_type
+        ))),
+    }
+}
+
+impl ArrayReader for ListArrayReader {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Returns data type.
+    /// This must be a List.
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {

Review comment:
       Does this handle nested lists?

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -426,6 +450,391 @@ where
     }
 }
 
+/// Implementation of list array reader.
+pub struct ListArrayReader {
+    item_reader: Box<dyn ArrayReader>,
+    data_type: ArrowType,
+    item_type: ArrowType,
+    list_def_level: i16,
+    list_rep_level: i16,
+    def_level_buffer: Option<Buffer>,
+    rep_level_buffer: Option<Buffer>,
+}
+
+impl ListArrayReader {
+    /// Construct list array reader.
+    pub fn new(
+        item_reader: Box<dyn ArrayReader>,
+        data_type: ArrowType,
+        item_type: ArrowType,
+        def_level: i16,
+        rep_level: i16,
+    ) -> Self {
+        Self {
+            item_reader,
+            data_type,
+            item_type,
+            list_def_level: def_level,
+            list_rep_level: rep_level,
+            def_level_buffer: None,
+            rep_level_buffer: None,
+        }
+    }
+}
+
+macro_rules! build_empty_list_array_with_primitive_items {
+    ($item_type:ident) => {{
+        let values_builder = PrimitiveBuilder::<$item_type>::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+macro_rules! build_empty_list_array_with_non_primitive_items {
+    ($builder:ident) => {{
+        let values_builder = $builder::new(0);
+        let mut builder = ListBuilder::new(values_builder);
+        let empty_list_array = builder.finish();
+        Ok(Arc::new(empty_list_array))
+    }};
+}
+
+fn build_empty_list_array(item_type: ArrowType) -> Result<ArrayRef> {

Review comment:
       The way this is implemented isn't ideal as it has to do pattern matching on every possible type. Is there a way to simplify this? For instance, every `ArrowPrimitiveType` already has a `DataType` as its field.

##########
File path: rust/datafusion/src/execution/physical_plan/expressions.rs
##########
@@ -39,9 +39,9 @@ use arrow::compute;
 use arrow::compute::kernels::arithmetic::{add, divide, multiply, subtract};
 use arrow::compute::kernels::boolean::{and, or};
 use arrow::compute::kernels::cast::cast;
-use arrow::compute::kernels::comparison::{eq, gt, gt_eq, lt, lt_eq, neq};
 use arrow::compute::kernels::comparison::{
-    eq_utf8, gt_eq_utf8, gt_utf8, like_utf8, lt_eq_utf8, lt_utf8, neq_utf8, nlike_utf8,

Review comment:
       Is this change related?

##########
File path: rust/parquet/src/arrow/array_reader.rs
##########
@@ -745,16 +1152,66 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
     }
 
     /// Build array reader for list type.
-    /// Currently this is not supported.
     fn visit_list_with_item(

Review comment:
       Is this being called anywhere? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org