You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2019/08/09 09:07:32 UTC
[arrow] branch master updated: ARROW-6069: [Rust] [Parquet] Add
converter.
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 114c721 ARROW-6069: [Rust] [Parquet] Add converter.
114c721 is described below
commit 114c7216b0cab894db3d1d583141c39c031db357
Author: Renjie Liu <li...@gmail.com>
AuthorDate: Fri Aug 9 11:07:09 2019 +0200
ARROW-6069: [Rust] [Parquet] Add converter.
Converter is used convert record reader's content to arrow primitive array.
Closes #4997 from liurenjie1024/arrow-6069 and squashes the following commits:
029bb3eef <Renjie Liu> Fix typo
27d1258b3 <Renjie Liu> Fix comments
fb32fe86b <Renjie Liu> Fix comments
74c55f5ca <Renjie Liu> Add license header
9045a6cca <Renjie Liu> Fix build break
579f9aae3 <Renjie Liu> Add converter.
Authored-by: Renjie Liu <li...@gmail.com>
Signed-off-by: Neville Dipale <ne...@gmail.com>
---
rust/parquet/src/arrow/converter.rs | 212 +++++++++++++++++++++++++
rust/parquet/src/arrow/mod.rs | 3 +-
rust/parquet/src/arrow/record_reader.rs | 23 +++
rust/parquet/src/util/test_common/page_util.rs | 21 +++
4 files changed, 258 insertions(+), 1 deletion(-)
diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs
new file mode 100644
index 0000000..263e78a
--- /dev/null
+++ b/rust/parquet/src/arrow/converter.rs
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::record_reader::RecordReader;
+use crate::data_type::DataType;
+use arrow::array::ArrayRef;
+use arrow::compute::cast;
+use std::convert::From;
+use std::sync::Arc;
+
+use crate::errors::Result;
+use arrow::datatypes::ArrowPrimitiveType;
+
+use arrow::array::ArrayDataBuilder;
+use arrow::array::PrimitiveArray;
+use std::marker::PhantomData;
+
+use crate::data_type::{
+ BoolType, DoubleType as ParquetDoubleType, FloatType as ParquetFloatType,
+ Int32Type as ParquetInt32Type, Int64Type as ParquetInt64Type,
+};
+use arrow::datatypes::{
+ BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
+ UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+};
+
+/// A converter is used to consume record reader's content and convert it to arrow
+/// primitive array.
+pub trait Converter<T: DataType> {
+ /// This method converts record reader's buffered content into arrow array.
+ /// It will consume record reader's data, but will not reset record reader's
+ /// state.
+ fn convert(record_reader: &mut RecordReader<T>) -> Result<ArrayRef>;
+}
+
+/// Cast converter first converts record reader's buffer to arrow's
+/// `PrimitiveArray<ArrowSourceType>`, then casts it to `PrimitiveArray<ArrowTargetType>`.
+pub struct CastConverter<ParquetType, ArrowSourceType, ArrowTargetType> {
+ _parquet_marker: PhantomData<ParquetType>,
+ _arrow_source_marker: PhantomData<ArrowSourceType>,
+ _arrow_target_marker: PhantomData<ArrowTargetType>,
+}
+
+impl<ParquetType, ArrowSourceType, ArrowTargetType> Converter<ParquetType>
+ for CastConverter<ParquetType, ArrowSourceType, ArrowTargetType>
+where
+ ParquetType: DataType,
+ ArrowSourceType: ArrowPrimitiveType,
+ ArrowTargetType: ArrowPrimitiveType,
+{
+ fn convert(record_reader: &mut RecordReader<ParquetType>) -> Result<ArrayRef> {
+ let record_data = record_reader.consume_record_data();
+
+ let mut array_data = ArrayDataBuilder::new(ArrowSourceType::get_data_type())
+ .len(record_reader.num_values())
+ .add_buffer(record_data);
+
+ if let Some(b) = record_reader.consume_bitmap_buffer() {
+ array_data = array_data.null_bit_buffer(b);
+ }
+
+ let primitive_array: ArrayRef =
+ Arc::new(PrimitiveArray::<ArrowSourceType>::from(array_data.build()));
+
+ Ok(cast(&primitive_array, &ArrowTargetType::get_data_type())?)
+ }
+}
+
+pub type BooleanConverter = CastConverter<BoolType, BooleanType, BooleanType>;
+pub type Int8Converter = CastConverter<ParquetInt32Type, Int32Type, Int8Type>;
+pub type UInt8Converter = CastConverter<ParquetInt32Type, Int32Type, UInt8Type>;
+pub type Int16Converter = CastConverter<ParquetInt32Type, Int32Type, Int16Type>;
+pub type UInt16Converter = CastConverter<ParquetInt32Type, Int32Type, UInt16Type>;
+pub type Int32Converter = CastConverter<ParquetInt32Type, Int32Type, Int32Type>;
+pub type UInt32Converter = CastConverter<ParquetInt32Type, UInt32Type, UInt32Type>;
+pub type Int64Converter = CastConverter<ParquetInt64Type, Int64Type, Int64Type>;
+pub type UInt64Converter = CastConverter<ParquetInt64Type, UInt64Type, UInt64Type>;
+pub type Float32Converter = CastConverter<ParquetFloatType, Float32Type, Float32Type>;
+pub type Float64Converter = CastConverter<ParquetDoubleType, Float64Type, Float64Type>;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::arrow::converter::Int16Converter;
+ use crate::arrow::record_reader::RecordReader;
+ use crate::basic::Encoding;
+ use crate::schema::parser::parse_message_type;
+ use crate::schema::types::SchemaDescriptor;
+ use crate::util::test_common::page_util::InMemoryPageReader;
+ use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
+ use arrow::array::ArrayEqual;
+ use arrow::array::PrimitiveArray;
+ use arrow::datatypes::{Int16Type, Int32Type};
+ use std::rc::Rc;
+
+ #[test]
+ fn test_converter_arrow_source_target_different() {
+ let raw_data = vec![Some(1i16), None, Some(2i16), Some(3i16)];
+
+ // Construct record reader
+ let mut record_reader = {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ OPTIONAL INT32 leaf;
+ }
+ ";
+
+ let def_levels = [1i16, 0i16, 1i16, 1i16];
+ build_record_reader(
+ message_type,
+ &[1, 2, 3],
+ 0i16,
+ None,
+ 1i16,
+ Some(&def_levels),
+ 10,
+ )
+ };
+
+ let array = Int16Converter::convert(&mut record_reader).unwrap();
+ let array = array
+ .as_any()
+ .downcast_ref::<PrimitiveArray<Int16Type>>()
+ .unwrap();
+
+ assert!(array.equals(&PrimitiveArray::<Int16Type>::from(raw_data)));
+ }
+
+ #[test]
+ fn test_converter_arrow_source_target_same() {
+ let raw_data = vec![Some(1), None, Some(2), Some(3)];
+
+ // Construct record reader
+ let mut record_reader = {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ OPTIONAL INT32 leaf;
+ }
+ ";
+
+ let def_levels = [1i16, 0i16, 1i16, 1i16];
+ build_record_reader(
+ message_type,
+ &[1, 2, 3],
+ 0i16,
+ None,
+ 1i16,
+ Some(&def_levels),
+ 10,
+ )
+ };
+
+ let array = Int32Converter::convert(&mut record_reader).unwrap();
+ let array = array
+ .as_any()
+ .downcast_ref::<PrimitiveArray<Int32Type>>()
+ .unwrap();
+
+ assert!(array.equals(&PrimitiveArray::<Int32Type>::from(raw_data)));
+ }
+
+ fn build_record_reader<T: DataType>(
+ message_type: &str,
+ values: &[T::T],
+ max_rep_level: i16,
+ rep_levels: Option<&[i16]>,
+ max_def_level: i16,
+ def_levels: Option<&[i16]>,
+ num_records: usize,
+ ) -> RecordReader<T> {
+ let desc = parse_message_type(message_type)
+ .map(|t| SchemaDescriptor::new(Rc::new(t)))
+ .map(|s| s.column(0))
+ .unwrap();
+
+ let mut record_reader = RecordReader::<T>::new(desc.clone());
+
+ // Prepare record reader
+ let mut pb = DataPageBuilderImpl::new(desc.clone(), 4, true);
+ if rep_levels.is_some() {
+ pb.add_rep_levels(max_rep_level, rep_levels.unwrap());
+ }
+ if def_levels.is_some() {
+ pb.add_def_levels(max_def_level, def_levels.unwrap());
+ }
+ pb.add_values::<T>(Encoding::PLAIN, &values);
+ let page = pb.consume();
+
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+ record_reader.set_page_reader(page_reader).unwrap();
+
+ record_reader.read_records(num_records).unwrap();
+
+ record_reader
+ }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index a86ffe0..af1d00c 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -20,7 +20,8 @@
//!
//! This mod provides API for converting between arrow and parquet.
-pub(crate) mod record_reader;
+pub(in crate::arrow) mod converter;
+pub(in crate::arrow) mod record_reader;
pub mod schema;
pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs
index 6b1237c..803f4a0 100644
--- a/rust/parquet/src/arrow/record_reader.rs
+++ b/rust/parquet/src/arrow/record_reader.rs
@@ -43,6 +43,8 @@ pub struct RecordReader<T: DataType> {
/// Number of records accumulated in records
num_records: usize,
+ /// Number of values `num_records` contains.
+ num_values: usize,
values_seen: usize,
/// Starts from 1, number of values have been written to buffer
@@ -112,6 +114,7 @@ impl<T: DataType> RecordReader<T> {
column_reader: None,
column_desc: column_schema,
num_records: 0,
+ num_values: 0,
values_seen: 0,
values_written: 0,
in_middle_of_record: false,
@@ -150,6 +153,7 @@ impl<T: DataType> RecordReader<T> {
&& self.in_middle_of_record
{
self.num_records += 1;
+ self.num_values = self.values_seen;
self.in_middle_of_record = false;
records_read += 1;
}
@@ -175,6 +179,13 @@ impl<T: DataType> RecordReader<T> {
self.num_records
}
+ /// Return number of values stored in buffer.
+ /// If the parquet column is not repeated, it should be equals to `num_records`,
+ /// otherwise it should be larger than or equal to `num_records`.
+ pub fn num_values(&self) -> usize {
+ self.num_values
+ }
+
/// Returns definition level data.
pub fn consume_def_levels(&mut self) -> Option<Buffer> {
let empty_def_buffer = if self.column_desc.max_def_level() > 0 {
@@ -333,6 +344,7 @@ impl<T: DataType> RecordReader<T> {
if self.in_middle_of_record {
records_read += 1;
self.num_records += 1;
+ self.num_values = self.values_seen;
}
self.in_middle_of_record = true;
}
@@ -345,6 +357,7 @@ impl<T: DataType> RecordReader<T> {
let records_read =
min(records_to_read, self.values_written - self.values_seen);
self.num_records += records_read;
+ self.num_values += records_read;
self.values_seen += records_read;
self.in_middle_of_record = false;
@@ -446,8 +459,10 @@ mod tests {
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
+ assert_eq!(2, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(5, record_reader.num_records());
+ assert_eq!(5, record_reader.num_values());
}
// Second page
@@ -467,6 +482,7 @@ mod tests {
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
+ assert_eq!(7, record_reader.num_values());
}
let mut bb = Int32BufferBuilder::new(7);
@@ -524,8 +540,10 @@ mod tests {
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
+ assert_eq!(2, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(5, record_reader.num_records());
+ assert_eq!(5, record_reader.num_values());
}
// Second page
@@ -548,6 +566,7 @@ mod tests {
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
+ assert_eq!(7, record_reader.num_values());
}
// Verify result record data
@@ -623,8 +642,10 @@ mod tests {
assert_eq!(1, record_reader.read_records(1).unwrap());
assert_eq!(1, record_reader.num_records());
+ assert_eq!(1, record_reader.num_values());
assert_eq!(2, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
+ assert_eq!(7, record_reader.num_values());
}
// Second page
@@ -649,6 +670,7 @@ mod tests {
assert_eq!(1, record_reader.read_records(10).unwrap());
assert_eq!(4, record_reader.num_records());
+ assert_eq!(9, record_reader.num_values());
}
// Verify result record data
@@ -711,6 +733,7 @@ mod tests {
assert_eq!(1000, record_reader.read_records(1000).unwrap());
assert_eq!(1000, record_reader.num_records());
+ assert_eq!(5000, record_reader.num_values());
}
}
}
diff --git a/rust/parquet/src/util/test_common/page_util.rs b/rust/parquet/src/util/test_common/page_util.rs
index 3f3cab7..d12b734 100644
--- a/rust/parquet/src/util/test_common/page_util.rs
+++ b/rust/parquet/src/util/test_common/page_util.rs
@@ -17,10 +17,12 @@
use crate::basic::Encoding;
use crate::column::page::Page;
+use crate::column::page::PageReader;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, Encoder};
use crate::encodings::levels::max_buffer_size;
use crate::encodings::levels::LevelEncoder;
+use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
use crate::util::memory::MemTracker;
@@ -155,3 +157,22 @@ impl DataPageBuilder for DataPageBuilderImpl {
}
}
}
+
+/// A utility page reader which stores pages in memory.
+pub struct InMemoryPageReader {
+ pages: Box<Iterator<Item = Page>>,
+}
+
+impl InMemoryPageReader {
+ pub fn new(pages: Vec<Page>) -> Self {
+ Self {
+ pages: Box::new(pages.into_iter()),
+ }
+ }
+}
+
+impl PageReader for InMemoryPageReader {
+ fn get_next_page(&mut self) -> Result<Option<Page>> {
+ Ok(self.pages.next())
+ }
+}