You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/11 17:52:09 UTC

[arrow-rs] branch master updated: Refactor parquet::arrow module (#1827)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 029203ed2 Refactor parquet::arrow module (#1827)
029203ed2 is described below

commit 029203ed2a0d1262e58b0cc11c54ce1b754587ba
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Jun 11 18:52:05 2022 +0100

    Refactor parquet::arrow module (#1827)
    
    * Refactor parquet::arrow module
    
    * Fix doc
    
    * Remove legacy benchmarks
---
 parquet/benches/arrow_reader.rs                    | 60 ++--------------------
 parquet/src/arrow/array_reader/builder.rs          |  2 +-
 parquet/src/arrow/array_reader/byte_array.rs       |  2 +-
 .../arrow/array_reader/byte_array_dictionary.rs    | 23 ++++-----
 .../arrow/{array_reader.rs => array_reader/mod.rs} | 11 ++--
 parquet/src/arrow/arrow_reader.rs                  |  4 +-
 parquet/src/arrow/{ => arrow_writer}/levels.rs     |  0
 .../arrow/{arrow_writer.rs => arrow_writer/mod.rs} |  5 +-
 parquet/src/arrow/{ => buffer}/bit_util.rs         |  0
 parquet/src/arrow/{ => buffer}/converter.rs        |  0
 .../{array_reader => buffer}/dictionary_buffer.rs  |  2 +-
 parquet/src/arrow/buffer/mod.rs                    | 23 +++++++++
 .../{array_reader => buffer}/offset_buffer.rs      |  4 +-
 parquet/src/arrow/mod.rs                           |  6 +--
 parquet/src/arrow/record_reader/buffer.rs          |  2 +-
 .../src/arrow/record_reader/definition_levels.rs   |  2 +-
 .../{record_reader.rs => record_reader/mod.rs}     |  0
 17 files changed, 56 insertions(+), 90 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 04e48baef..647a8dc6f 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -355,27 +355,6 @@ fn create_string_byte_array_dictionary_reader(
     .unwrap()
 }
 
-fn create_complex_object_byte_array_dictionary_reader(
-    page_iterator: impl PageIterator + 'static,
-    column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
-    use parquet::arrow::array_reader::ComplexObjectArrayReader;
-    use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
-    let arrow_type =
-        DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
-
-    let converter = Utf8Converter::new(Utf8ArrayConverter {});
-    Box::new(
-        ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
-            Box::new(page_iterator),
-            column_desc,
-            converter,
-            Some(arrow_type),
-        )
-        .unwrap(),
-    )
-}
-
 fn bench_primitive<T>(
     group: &mut BenchmarkGroup<WallTime>,
     schema: &SchemaDescPtr,
@@ -678,18 +657,7 @@ fn add_benches(c: &mut Criterion) {
 
     let mut group = c.benchmark_group("arrow_array_reader/StringDictionary");
 
-    group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| {
-        b.iter(|| {
-            let array_reader = create_complex_object_byte_array_dictionary_reader(
-                dictionary_string_no_null_data.clone(),
-                mandatory_string_column_desc.clone(),
-            );
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
-
-    group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| {
+    group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
         b.iter(|| {
             let array_reader = create_string_byte_array_dictionary_reader(
                 dictionary_string_no_null_data.clone(),
@@ -700,18 +668,7 @@ fn add_benches(c: &mut Criterion) {
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
 
-    group.bench_function("dictionary encoded, optional, no NULLs - old", |b| {
-        b.iter(|| {
-            let array_reader = create_complex_object_byte_array_dictionary_reader(
-                dictionary_string_no_null_data.clone(),
-                optional_string_column_desc.clone(),
-            );
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
-
-    group.bench_function("dictionary encoded, optional, no NULLs - new", |b| {
+    group.bench_function("dictionary encoded, optional, no NULLs", |b| {
         b.iter(|| {
             let array_reader = create_string_byte_array_dictionary_reader(
                 dictionary_string_no_null_data.clone(),
@@ -722,18 +679,7 @@ fn add_benches(c: &mut Criterion) {
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
 
-    group.bench_function("dictionary encoded, optional, half NULLs - old", |b| {
-        b.iter(|| {
-            let array_reader = create_complex_object_byte_array_dictionary_reader(
-                dictionary_string_half_null_data.clone(),
-                optional_string_column_desc.clone(),
-            );
-            count = bench_array_reader(array_reader);
-        });
-        assert_eq!(count, EXPECTED_VALUE_COUNT);
-    });
-
-    group.bench_function("dictionary encoded, optional, half NULLs - new", |b| {
+    group.bench_function("dictionary encoded, optional, half NULLs", |b| {
         b.iter(|| {
             let array_reader = create_string_byte_array_dictionary_reader(
                 dictionary_string_half_null_data.clone(),
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 7b9adfc23..e8c22f95a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -25,7 +25,7 @@ use crate::arrow::array_reader::{
     ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
     PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
 };
-use crate::arrow::converter::{
+use crate::arrow::buffer::converter::{
     DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
     FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
     IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index b3606a780..2e29b6094 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
 use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::buffer::offset_buffer::OffsetBuffer;
 use crate::arrow::record_reader::buffer::ScalarValue;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index fe8448ffb..0e64f0d25 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -24,12 +24,11 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
 use arrow::buffer::Buffer;
 use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
 
-use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
-use crate::arrow::array_reader::{
-    byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
-    offset_buffer::OffsetBuffer,
-};
+use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
 use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::buffer::{
+    dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
+};
 use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
@@ -236,13 +235,13 @@ where
     fn new(col: &ColumnDescPtr) -> Self {
         let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
 
-        let value_type =
-            match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
-                (true, true) => ArrowType::LargeUtf8,
-                (true, false) => ArrowType::LargeBinary,
-                (false, true) => ArrowType::Utf8,
-                (false, false) => ArrowType::Binary,
-            };
+        let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8)
+        {
+            (true, true) => ArrowType::LargeUtf8,
+            (true, false) => ArrowType::LargeBinary,
+            (false, true) => ArrowType::Utf8,
+            (false, false) => ArrowType::Binary,
+        };
 
         Self {
             dict: None,
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader/mod.rs
similarity index 99%
rename from parquet/src/arrow/array_reader.rs
rename to parquet/src/arrow/array_reader/mod.rs
index c70071dac..21c49b338 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Logic for reading into arrow arrays
+
 use std::any::Any;
 use std::cmp::max;
 use std::marker::PhantomData;
@@ -34,7 +36,7 @@ use arrow::datatypes::{
     UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
 };
 
-use crate::arrow::converter::Converter;
+use crate::arrow::buffer::converter::Converter;
 use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
 use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
 use crate::arrow::schema::parquet_to_arrow_field;
@@ -50,11 +52,9 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 mod builder;
 mod byte_array;
 mod byte_array_dictionary;
-mod dictionary_buffer;
 mod empty_array;
 mod list_array;
 mod map_array;
-mod offset_buffer;
 
 #[cfg(test)]
 mod test_util;
@@ -811,7 +811,7 @@ mod tests {
         TimestampMillisecondType as ArrowTimestampMillisecondType,
     };
 
-    use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+    use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
     use crate::basic::{Encoding, Type as PhysicalType};
     use crate::column::page::Page;
     use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
@@ -1384,8 +1384,7 @@ mod tests {
         let mut all_values = Vec::with_capacity(num_pages * values_per_page);
 
         for i in 0..num_pages {
-            let mut dict_encoder =
-                DictEncoder::<ByteArrayType>::new(column_desc.clone());
+            let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
             // add data page
             let mut values = Vec::with_capacity(values_per_page);
 
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 1d56960cf..92d4ff264 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Contains reader which reads parquet data into arrow array.
+//! Contains reader which reads parquet data into arrow [`RecordBatch`]
 
 use std::sync::Arc;
 
@@ -294,7 +294,7 @@ mod tests {
     use crate::arrow::arrow_reader::{
         ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
     };
-    use crate::arrow::converter::{
+    use crate::arrow::buffer::converter::{
         BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
         IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
     };
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs
similarity index 100%
rename from parquet/src/arrow/levels.rs
rename to parquet/src/arrow/arrow_writer/levels.rs
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer/mod.rs
similarity index 99%
rename from parquet/src/arrow/arrow_writer.rs
rename to parquet/src/arrow/arrow_writer/mod.rs
index 334c7237d..44631e574 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -27,19 +27,20 @@ use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_array::Array;
 
-use super::levels::LevelInfo;
 use super::schema::{
     add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
     decimal_length_from_precision,
 };
 
-use crate::arrow::levels::calculate_array_levels;
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::RowGroupMetaDataPtr;
 use crate::file::properties::WriterProperties;
 use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
 use crate::{data_type::*, file::writer::SerializedFileWriter};
+use levels::{calculate_array_levels, LevelInfo};
+
+mod levels;
 
 /// Arrow writer
 ///
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs
similarity index 100%
rename from parquet/src/arrow/bit_util.rs
rename to parquet/src/arrow/buffer/bit_util.rs
diff --git a/parquet/src/arrow/converter.rs b/parquet/src/arrow/buffer/converter.rs
similarity index 100%
rename from parquet/src/arrow/converter.rs
rename to parquet/src/arrow/buffer/converter.rs
diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs
similarity index 99%
rename from parquet/src/arrow/array_reader/dictionary_buffer.rs
rename to parquet/src/arrow/buffer/dictionary_buffer.rs
index 6dc9cc80f..7f4458507 100644
--- a/parquet/src/arrow/array_reader/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::buffer::offset_buffer::OffsetBuffer;
 use crate::arrow::record_reader::buffer::{
     BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
 };
diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs
new file mode 100644
index 000000000..5ee89aa1a
--- /dev/null
+++ b/parquet/src/arrow/buffer/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Logic for reading data into arrow buffers
+
+pub mod bit_util;
+pub mod converter;
+pub mod dictionary_buffer;
+pub mod offset_buffer;
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs
similarity index 98%
rename from parquet/src/arrow/array_reader/offset_buffer.rs
rename to parquet/src/arrow/buffer/offset_buffer.rs
index 23e7af759..2d73e3f14 100644
--- a/parquet/src/arrow/array_reader/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::arrow::bit_util::iter_set_bits_rev;
+use crate::arrow::buffer::bit_util::iter_set_bits_rev;
 use crate::arrow::record_reader::buffer::{
     BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
 };
@@ -58,7 +58,7 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
     /// the start of a UTF-8 codepoint
     ///
     /// Note: This does not verify that the entirety of `data` is valid
-    /// UTF-8. This should be done by calling [`Self::values_as_str`] after
+    /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
     /// all data has been written
     pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
         if validate_utf8 {
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index c9cc0ff6c..3aee7cf42 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -122,14 +122,12 @@
 experimental_mod!(array_reader);
 pub mod arrow_reader;
 pub mod arrow_writer;
-mod bit_util;
+mod buffer;
 
 #[cfg(feature = "async")]
 pub mod async_reader;
 
-experimental_mod!(converter);
-pub(in crate::arrow) mod levels;
-pub(in crate::arrow) mod record_reader;
+mod record_reader;
 experimental_mod!(schema);
 
 pub use self::arrow_reader::ArrowReader;
diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs
index 4fa8213de..fa0f91991 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,7 +17,7 @@
 
 use std::marker::PhantomData;
 
-use crate::arrow::bit_util::iter_set_bits_rev;
+use crate::arrow::buffer::bit_util::iter_set_bits_rev;
 use arrow::buffer::{Buffer, MutableBuffer};
 use arrow::datatypes::ToByteSlice;
 
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 93de4006c..9cca25c8a 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -21,7 +21,7 @@ use arrow::array::BooleanBufferBuilder;
 use arrow::bitmap::Bitmap;
 use arrow::buffer::Buffer;
 
-use crate::arrow::bit_util::count_set_bits;
+use crate::arrow::buffer::bit_util::count_set_bits;
 use crate::arrow::record_reader::buffer::BufferQueue;
 use crate::basic::Encoding;
 use crate::column::reader::decoder::{
diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader/mod.rs
similarity index 100%
rename from parquet/src/arrow/record_reader.rs
rename to parquet/src/arrow/record_reader/mod.rs