You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/06/11 17:52:09 UTC
[arrow-rs] branch master updated: Refactor parquet::arrow module (#1827)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 029203ed2 Refactor parquet::arrow module (#1827)
029203ed2 is described below
commit 029203ed2a0d1262e58b0cc11c54ce1b754587ba
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sat Jun 11 18:52:05 2022 +0100
Refactor parquet::arrow module (#1827)
* Refactor parquet::arrow module
* Fix doc
* Remove legacy benchmarks
---
parquet/benches/arrow_reader.rs | 60 ++--------------------
parquet/src/arrow/array_reader/builder.rs | 2 +-
parquet/src/arrow/array_reader/byte_array.rs | 2 +-
.../arrow/array_reader/byte_array_dictionary.rs | 23 ++++-----
.../arrow/{array_reader.rs => array_reader/mod.rs} | 11 ++--
parquet/src/arrow/arrow_reader.rs | 4 +-
parquet/src/arrow/{ => arrow_writer}/levels.rs | 0
.../arrow/{arrow_writer.rs => arrow_writer/mod.rs} | 5 +-
parquet/src/arrow/{ => buffer}/bit_util.rs | 0
parquet/src/arrow/{ => buffer}/converter.rs | 0
.../{array_reader => buffer}/dictionary_buffer.rs | 2 +-
parquet/src/arrow/buffer/mod.rs | 23 +++++++++
.../{array_reader => buffer}/offset_buffer.rs | 4 +-
parquet/src/arrow/mod.rs | 6 +--
parquet/src/arrow/record_reader/buffer.rs | 2 +-
.../src/arrow/record_reader/definition_levels.rs | 2 +-
.../{record_reader.rs => record_reader/mod.rs} | 0
17 files changed, 56 insertions(+), 90 deletions(-)
diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index 04e48baef..647a8dc6f 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -355,27 +355,6 @@ fn create_string_byte_array_dictionary_reader(
.unwrap()
}
-fn create_complex_object_byte_array_dictionary_reader(
- page_iterator: impl PageIterator + 'static,
- column_desc: ColumnDescPtr,
-) -> Box<dyn ArrayReader> {
- use parquet::arrow::array_reader::ComplexObjectArrayReader;
- use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
- let arrow_type =
- DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
-
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- Box::new(
- ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
- Box::new(page_iterator),
- column_desc,
- converter,
- Some(arrow_type),
- )
- .unwrap(),
- )
-}
-
fn bench_primitive<T>(
group: &mut BenchmarkGroup<WallTime>,
schema: &SchemaDescPtr,
@@ -678,18 +657,7 @@ fn add_benches(c: &mut Criterion) {
let mut group = c.benchmark_group("arrow_array_reader/StringDictionary");
- group.bench_function("dictionary encoded, mandatory, no NULLs - old", |b| {
- b.iter(|| {
- let array_reader = create_complex_object_byte_array_dictionary_reader(
- dictionary_string_no_null_data.clone(),
- mandatory_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- });
-
- group.bench_function("dictionary encoded, mandatory, no NULLs - new", |b| {
+ group.bench_function("dictionary encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
@@ -700,18 +668,7 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
- group.bench_function("dictionary encoded, optional, no NULLs - old", |b| {
- b.iter(|| {
- let array_reader = create_complex_object_byte_array_dictionary_reader(
- dictionary_string_no_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- });
-
- group.bench_function("dictionary encoded, optional, no NULLs - new", |b| {
+ group.bench_function("dictionary encoded, optional, no NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_no_null_data.clone(),
@@ -722,18 +679,7 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});
- group.bench_function("dictionary encoded, optional, half NULLs - old", |b| {
- b.iter(|| {
- let array_reader = create_complex_object_byte_array_dictionary_reader(
- dictionary_string_half_null_data.clone(),
- optional_string_column_desc.clone(),
- );
- count = bench_array_reader(array_reader);
- });
- assert_eq!(count, EXPECTED_VALUE_COUNT);
- });
-
- group.bench_function("dictionary encoded, optional, half NULLs - new", |b| {
+ group.bench_function("dictionary encoded, optional, half NULLs", |b| {
b.iter(|| {
let array_reader = create_string_byte_array_dictionary_reader(
dictionary_string_half_null_data.clone(),
diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs
index 7b9adfc23..e8c22f95a 100644
--- a/parquet/src/arrow/array_reader/builder.rs
+++ b/parquet/src/arrow/array_reader/builder.rs
@@ -25,7 +25,7 @@ use crate::arrow::array_reader::{
ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
PrimitiveArrayReader, RowGroupCollection, StructArrayReader,
};
-use crate::arrow::converter::{
+use crate::arrow::buffer::converter::{
DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter,
FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter,
IntervalDayTimeArrayConverter, IntervalDayTimeConverter,
diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs
index b3606a780..2e29b6094 100644
--- a/parquet/src/arrow/array_reader/byte_array.rs
+++ b/parquet/src/arrow/array_reader/byte_array.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
index fe8448ffb..0e64f0d25 100644
--- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs
+++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs
@@ -24,12 +24,11 @@ use arrow::array::{Array, ArrayRef, OffsetSizeTrait};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};
-use crate::arrow::array_reader::dictionary_buffer::DictionaryBuffer;
-use crate::arrow::array_reader::{
- byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain},
- offset_buffer::OffsetBuffer,
-};
+use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, ArrayReader};
+use crate::arrow::buffer::{
+ dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
+};
use crate::arrow::record_reader::buffer::{BufferQueue, ScalarValue};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
@@ -236,13 +235,13 @@ where
fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
- let value_type =
- match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
- (true, true) => ArrowType::LargeUtf8,
- (true, false) => ArrowType::LargeBinary,
- (false, true) => ArrowType::Utf8,
- (false, false) => ArrowType::Binary,
- };
+ let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8)
+ {
+ (true, true) => ArrowType::LargeUtf8,
+ (true, false) => ArrowType::LargeBinary,
+ (false, true) => ArrowType::Utf8,
+ (false, false) => ArrowType::Binary,
+ };
Self {
dict: None,
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader/mod.rs
similarity index 99%
rename from parquet/src/arrow/array_reader.rs
rename to parquet/src/arrow/array_reader/mod.rs
index c70071dac..21c49b338 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader/mod.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+//! Logic for reading into arrow arrays
+
use std::any::Any;
use std::cmp::max;
use std::marker::PhantomData;
@@ -34,7 +36,7 @@ use arrow::datatypes::{
UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type,
};
-use crate::arrow::converter::Converter;
+use crate::arrow::buffer::converter::Converter;
use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer};
use crate::arrow::record_reader::{GenericRecordReader, RecordReader};
use crate::arrow::schema::parquet_to_arrow_field;
@@ -50,11 +52,9 @@ use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
mod builder;
mod byte_array;
mod byte_array_dictionary;
-mod dictionary_buffer;
mod empty_array;
mod list_array;
mod map_array;
-mod offset_buffer;
#[cfg(test)]
mod test_util;
@@ -811,7 +811,7 @@ mod tests {
TimestampMillisecondType as ArrowTimestampMillisecondType,
};
- use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+ use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::Page;
use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type};
@@ -1384,8 +1384,7 @@ mod tests {
let mut all_values = Vec::with_capacity(num_pages * values_per_page);
for i in 0..num_pages {
- let mut dict_encoder =
- DictEncoder::<ByteArrayType>::new(column_desc.clone());
+ let mut dict_encoder = DictEncoder::<ByteArrayType>::new(column_desc.clone());
// add data page
let mut values = Vec::with_capacity(values_per_page);
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 1d56960cf..92d4ff264 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-//! Contains reader which reads parquet data into arrow array.
+//! Contains reader which reads parquet data into arrow [`RecordBatch`]
use std::sync::Arc;
@@ -294,7 +294,7 @@ mod tests {
use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
};
- use crate::arrow::converter::{
+ use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
};
diff --git a/parquet/src/arrow/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs
similarity index 100%
rename from parquet/src/arrow/levels.rs
rename to parquet/src/arrow/arrow_writer/levels.rs
diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer/mod.rs
similarity index 99%
rename from parquet/src/arrow/arrow_writer.rs
rename to parquet/src/arrow/arrow_writer/mod.rs
index 334c7237d..44631e574 100644
--- a/parquet/src/arrow/arrow_writer.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -27,19 +27,20 @@ use arrow::datatypes::{DataType as ArrowDataType, IntervalUnit, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
-use super::levels::LevelInfo;
use super::schema::{
add_encoded_arrow_schema_to_metadata, arrow_to_parquet_schema,
decimal_length_from_precision,
};
-use crate::arrow::levels::calculate_array_levels;
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::{data_type::*, file::writer::SerializedFileWriter};
+use levels::{calculate_array_levels, LevelInfo};
+
+mod levels;
/// Arrow writer
///
diff --git a/parquet/src/arrow/bit_util.rs b/parquet/src/arrow/buffer/bit_util.rs
similarity index 100%
rename from parquet/src/arrow/bit_util.rs
rename to parquet/src/arrow/buffer/bit_util.rs
diff --git a/parquet/src/arrow/converter.rs b/parquet/src/arrow/buffer/converter.rs
similarity index 100%
rename from parquet/src/arrow/converter.rs
rename to parquet/src/arrow/buffer/converter.rs
diff --git a/parquet/src/arrow/array_reader/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs
similarity index 99%
rename from parquet/src/arrow/array_reader/dictionary_buffer.rs
rename to parquet/src/arrow/buffer/dictionary_buffer.rs
index 6dc9cc80f..7f4458507 100644
--- a/parquet/src/arrow/array_reader/dictionary_buffer.rs
+++ b/parquet/src/arrow/buffer/dictionary_buffer.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::array_reader::offset_buffer::OffsetBuffer;
+use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
diff --git a/parquet/src/arrow/buffer/mod.rs b/parquet/src/arrow/buffer/mod.rs
new file mode 100644
index 000000000..5ee89aa1a
--- /dev/null
+++ b/parquet/src/arrow/buffer/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Logic for reading data into arrow buffers
+
+pub mod bit_util;
+pub mod converter;
+pub mod dictionary_buffer;
+pub mod offset_buffer;
diff --git a/parquet/src/arrow/array_reader/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs
similarity index 98%
rename from parquet/src/arrow/array_reader/offset_buffer.rs
rename to parquet/src/arrow/buffer/offset_buffer.rs
index 23e7af759..2d73e3f14 100644
--- a/parquet/src/arrow/array_reader/offset_buffer.rs
+++ b/parquet/src/arrow/buffer/offset_buffer.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::arrow::bit_util::iter_set_bits_rev;
+use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use crate::arrow::record_reader::buffer::{
BufferQueue, ScalarBuffer, ScalarValue, ValuesBuffer,
};
@@ -58,7 +58,7 @@ impl<I: OffsetSizeTrait + ScalarValue> OffsetBuffer<I> {
/// the start of a UTF-8 codepoint
///
/// Note: This does not verify that the entirety of `data` is valid
- /// UTF-8. This should be done by calling [`Self::values_as_str`] after
+ /// UTF-8. This should be done by calling [`Self::check_valid_utf8`] after
/// all data has been written
pub fn try_push(&mut self, data: &[u8], validate_utf8: bool) -> Result<()> {
if validate_utf8 {
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index c9cc0ff6c..3aee7cf42 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -122,14 +122,12 @@
experimental_mod!(array_reader);
pub mod arrow_reader;
pub mod arrow_writer;
-mod bit_util;
+mod buffer;
#[cfg(feature = "async")]
pub mod async_reader;
-experimental_mod!(converter);
-pub(in crate::arrow) mod levels;
-pub(in crate::arrow) mod record_reader;
+mod record_reader;
experimental_mod!(schema);
pub use self::arrow_reader::ArrowReader;
diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs
index 4fa8213de..fa0f91991 100644
--- a/parquet/src/arrow/record_reader/buffer.rs
+++ b/parquet/src/arrow/record_reader/buffer.rs
@@ -17,7 +17,7 @@
use std::marker::PhantomData;
-use crate::arrow::bit_util::iter_set_bits_rev;
+use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::ToByteSlice;
diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs
index 93de4006c..9cca25c8a 100644
--- a/parquet/src/arrow/record_reader/definition_levels.rs
+++ b/parquet/src/arrow/record_reader/definition_levels.rs
@@ -21,7 +21,7 @@ use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
-use crate::arrow::bit_util::count_set_bits;
+use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::basic::Encoding;
use crate::column::reader::decoder::{
diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader/mod.rs
similarity index 100%
rename from parquet/src/arrow/record_reader.rs
rename to parquet/src/arrow/record_reader/mod.rs