You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2019/08/09 09:07:32 UTC

[arrow] branch master updated: ARROW-6069: [Rust] [Parquet] Add converter.

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 114c721  ARROW-6069: [Rust] [Parquet] Add converter.
114c721 is described below

commit 114c7216b0cab894db3d1d583141c39c031db357
Author: Renjie Liu <li...@gmail.com>
AuthorDate: Fri Aug 9 11:07:09 2019 +0200

    ARROW-6069: [Rust] [Parquet] Add converter.
    
    Converter is used convert record reader's content to arrow primitive array.
    
    Closes #4997 from liurenjie1024/arrow-6069 and squashes the following commits:
    
    029bb3eef <Renjie Liu> Fix typo
    27d1258b3 <Renjie Liu> Fix comments
    fb32fe86b <Renjie Liu> Fix comments
    74c55f5ca <Renjie Liu> Add license header
    9045a6cca <Renjie Liu> Fix build break
    579f9aae3 <Renjie Liu> Add converter.
    
    Authored-by: Renjie Liu <li...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/src/arrow/converter.rs            | 212 +++++++++++++++++++++++++
 rust/parquet/src/arrow/mod.rs                  |   3 +-
 rust/parquet/src/arrow/record_reader.rs        |  23 +++
 rust/parquet/src/util/test_common/page_util.rs |  21 +++
 4 files changed, 258 insertions(+), 1 deletion(-)

diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs
new file mode 100644
index 0000000..263e78a
--- /dev/null
+++ b/rust/parquet/src/arrow/converter.rs
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::arrow::record_reader::RecordReader;
+use crate::data_type::DataType;
+use arrow::array::ArrayRef;
+use arrow::compute::cast;
+use std::convert::From;
+use std::sync::Arc;
+
+use crate::errors::Result;
+use arrow::datatypes::ArrowPrimitiveType;
+
+use arrow::array::ArrayDataBuilder;
+use arrow::array::PrimitiveArray;
+use std::marker::PhantomData;
+
+use crate::data_type::{
+    BoolType, DoubleType as ParquetDoubleType, FloatType as ParquetFloatType,
+    Int32Type as ParquetInt32Type, Int64Type as ParquetInt64Type,
+};
+use arrow::datatypes::{
+    BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
+    UInt16Type, UInt32Type, UInt64Type, UInt8Type,
+};
+
+/// A converter is used to consume record reader's content and convert it to arrow
+/// primitive array.
+pub trait Converter<T: DataType> {
+    /// This method converts record reader's buffered content into arrow array.
+    /// It will consume record reader's data, but will not reset record reader's
+    /// state.
+    fn convert(record_reader: &mut RecordReader<T>) -> Result<ArrayRef>;
+}
+
+/// Cast converter first converts record reader's buffer to arrow's
+/// `PrimitiveArray<ArrowSourceType>`, then casts it to `PrimitiveArray<ArrowTargetType>`.
+pub struct CastConverter<ParquetType, ArrowSourceType, ArrowTargetType> {
+    _parquet_marker: PhantomData<ParquetType>,
+    _arrow_source_marker: PhantomData<ArrowSourceType>,
+    _arrow_target_marker: PhantomData<ArrowTargetType>,
+}
+
+impl<ParquetType, ArrowSourceType, ArrowTargetType> Converter<ParquetType>
+    for CastConverter<ParquetType, ArrowSourceType, ArrowTargetType>
+where
+    ParquetType: DataType,
+    ArrowSourceType: ArrowPrimitiveType,
+    ArrowTargetType: ArrowPrimitiveType,
+{
+    fn convert(record_reader: &mut RecordReader<ParquetType>) -> Result<ArrayRef> {
+        let record_data = record_reader.consume_record_data();
+
+        let mut array_data = ArrayDataBuilder::new(ArrowSourceType::get_data_type())
+            .len(record_reader.num_values())
+            .add_buffer(record_data);
+
+        if let Some(b) = record_reader.consume_bitmap_buffer() {
+            array_data = array_data.null_bit_buffer(b);
+        }
+
+        let primitive_array: ArrayRef =
+            Arc::new(PrimitiveArray::<ArrowSourceType>::from(array_data.build()));
+
+        Ok(cast(&primitive_array, &ArrowTargetType::get_data_type())?)
+    }
+}
+
+pub type BooleanConverter = CastConverter<BoolType, BooleanType, BooleanType>;
+pub type Int8Converter = CastConverter<ParquetInt32Type, Int32Type, Int8Type>;
+pub type UInt8Converter = CastConverter<ParquetInt32Type, Int32Type, UInt8Type>;
+pub type Int16Converter = CastConverter<ParquetInt32Type, Int32Type, Int16Type>;
+pub type UInt16Converter = CastConverter<ParquetInt32Type, Int32Type, UInt16Type>;
+pub type Int32Converter = CastConverter<ParquetInt32Type, Int32Type, Int32Type>;
+pub type UInt32Converter = CastConverter<ParquetInt32Type, UInt32Type, UInt32Type>;
+pub type Int64Converter = CastConverter<ParquetInt64Type, Int64Type, Int64Type>;
+pub type UInt64Converter = CastConverter<ParquetInt64Type, UInt64Type, UInt64Type>;
+pub type Float32Converter = CastConverter<ParquetFloatType, Float32Type, Float32Type>;
+pub type Float64Converter = CastConverter<ParquetDoubleType, Float64Type, Float64Type>;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::arrow::converter::Int16Converter;
+    use crate::arrow::record_reader::RecordReader;
+    use crate::basic::Encoding;
+    use crate::schema::parser::parse_message_type;
+    use crate::schema::types::SchemaDescriptor;
+    use crate::util::test_common::page_util::InMemoryPageReader;
+    use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
+    use arrow::array::ArrayEqual;
+    use arrow::array::PrimitiveArray;
+    use arrow::datatypes::{Int16Type, Int32Type};
+    use std::rc::Rc;
+
+    #[test]
+    fn test_converter_arrow_source_target_different() {
+        let raw_data = vec![Some(1i16), None, Some(2i16), Some(3i16)];
+
+        // Construct record reader
+        let mut record_reader = {
+            // Construct column schema
+            let message_type = "
+            message test_schema {
+              OPTIONAL INT32 leaf;
+            }
+            ";
+
+            let def_levels = [1i16, 0i16, 1i16, 1i16];
+            build_record_reader(
+                message_type,
+                &[1, 2, 3],
+                0i16,
+                None,
+                1i16,
+                Some(&def_levels),
+                10,
+            )
+        };
+
+        let array = Int16Converter::convert(&mut record_reader).unwrap();
+        let array = array
+            .as_any()
+            .downcast_ref::<PrimitiveArray<Int16Type>>()
+            .unwrap();
+
+        assert!(array.equals(&PrimitiveArray::<Int16Type>::from(raw_data)));
+    }
+
+    #[test]
+    fn test_converter_arrow_source_target_same() {
+        let raw_data = vec![Some(1), None, Some(2), Some(3)];
+
+        // Construct record reader
+        let mut record_reader = {
+            // Construct column schema
+            let message_type = "
+            message test_schema {
+              OPTIONAL INT32 leaf;
+            }
+            ";
+
+            let def_levels = [1i16, 0i16, 1i16, 1i16];
+            build_record_reader(
+                message_type,
+                &[1, 2, 3],
+                0i16,
+                None,
+                1i16,
+                Some(&def_levels),
+                10,
+            )
+        };
+
+        let array = Int32Converter::convert(&mut record_reader).unwrap();
+        let array = array
+            .as_any()
+            .downcast_ref::<PrimitiveArray<Int32Type>>()
+            .unwrap();
+
+        assert!(array.equals(&PrimitiveArray::<Int32Type>::from(raw_data)));
+    }
+
+    fn build_record_reader<T: DataType>(
+        message_type: &str,
+        values: &[T::T],
+        max_rep_level: i16,
+        rep_levels: Option<&[i16]>,
+        max_def_level: i16,
+        def_levels: Option<&[i16]>,
+        num_records: usize,
+    ) -> RecordReader<T> {
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Rc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        let mut record_reader = RecordReader::<T>::new(desc.clone());
+
+        // Prepare record reader
+        let mut pb = DataPageBuilderImpl::new(desc.clone(), 4, true);
+        if rep_levels.is_some() {
+            pb.add_rep_levels(max_rep_level, rep_levels.unwrap());
+        }
+        if def_levels.is_some() {
+            pb.add_def_levels(max_def_level, def_levels.unwrap());
+        }
+        pb.add_values::<T>(Encoding::PLAIN, &values);
+        let page = pb.consume();
+
+        let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+        record_reader.set_page_reader(page_reader).unwrap();
+
+        record_reader.read_records(num_records).unwrap();
+
+        record_reader
+    }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index a86ffe0..af1d00c 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -20,7 +20,8 @@
 //!
 //! This mod provides API for converting between arrow and parquet.
 
-pub(crate) mod record_reader;
+pub(in crate::arrow) mod converter;
+pub(in crate::arrow) mod record_reader;
 pub mod schema;
 
 pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs
index 6b1237c..803f4a0 100644
--- a/rust/parquet/src/arrow/record_reader.rs
+++ b/rust/parquet/src/arrow/record_reader.rs
@@ -43,6 +43,8 @@ pub struct RecordReader<T: DataType> {
 
     /// Number of records accumulated in records
     num_records: usize,
+    /// Number of values `num_records` contains.
+    num_values: usize,
 
     values_seen: usize,
     /// Starts from 1, number of values have been written to buffer
@@ -112,6 +114,7 @@ impl<T: DataType> RecordReader<T> {
             column_reader: None,
             column_desc: column_schema,
             num_records: 0,
+            num_values: 0,
             values_seen: 0,
             values_written: 0,
             in_middle_of_record: false,
@@ -150,6 +153,7 @@ impl<T: DataType> RecordReader<T> {
                 && self.in_middle_of_record
             {
                 self.num_records += 1;
+                self.num_values = self.values_seen;
                 self.in_middle_of_record = false;
                 records_read += 1;
             }
@@ -175,6 +179,13 @@ impl<T: DataType> RecordReader<T> {
         self.num_records
     }
 
+    /// Return number of values stored in buffer.
+    /// If the parquet column is not repeated, it should be equals to `num_records`,
+    /// otherwise it should be larger than or equal to `num_records`.
+    pub fn num_values(&self) -> usize {
+        self.num_values
+    }
+
     /// Returns definition level data.
     pub fn consume_def_levels(&mut self) -> Option<Buffer> {
         let empty_def_buffer = if self.column_desc.max_def_level() > 0 {
@@ -333,6 +344,7 @@ impl<T: DataType> RecordReader<T> {
                         if self.in_middle_of_record {
                             records_read += 1;
                             self.num_records += 1;
+                            self.num_values = self.values_seen;
                         }
                         self.in_middle_of_record = true;
                     }
@@ -345,6 +357,7 @@ impl<T: DataType> RecordReader<T> {
                 let records_read =
                     min(records_to_read, self.values_written - self.values_seen);
                 self.num_records += records_read;
+                self.num_values += records_read;
                 self.values_seen += records_read;
                 self.in_middle_of_record = false;
 
@@ -446,8 +459,10 @@ mod tests {
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(2).unwrap());
             assert_eq!(2, record_reader.num_records());
+            assert_eq!(2, record_reader.num_values());
             assert_eq!(3, record_reader.read_records(3).unwrap());
             assert_eq!(5, record_reader.num_records());
+            assert_eq!(5, record_reader.num_values());
         }
 
         // Second page
@@ -467,6 +482,7 @@ mod tests {
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(10).unwrap());
             assert_eq!(7, record_reader.num_records());
+            assert_eq!(7, record_reader.num_values());
         }
 
         let mut bb = Int32BufferBuilder::new(7);
@@ -524,8 +540,10 @@ mod tests {
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(2).unwrap());
             assert_eq!(2, record_reader.num_records());
+            assert_eq!(2, record_reader.num_values());
             assert_eq!(3, record_reader.read_records(3).unwrap());
             assert_eq!(5, record_reader.num_records());
+            assert_eq!(5, record_reader.num_values());
         }
 
         // Second page
@@ -548,6 +566,7 @@ mod tests {
             record_reader.set_page_reader(page_reader).unwrap();
             assert_eq!(2, record_reader.read_records(10).unwrap());
             assert_eq!(7, record_reader.num_records());
+            assert_eq!(7, record_reader.num_values());
         }
 
         // Verify result record data
@@ -623,8 +642,10 @@ mod tests {
 
             assert_eq!(1, record_reader.read_records(1).unwrap());
             assert_eq!(1, record_reader.num_records());
+            assert_eq!(1, record_reader.num_values());
             assert_eq!(2, record_reader.read_records(3).unwrap());
             assert_eq!(3, record_reader.num_records());
+            assert_eq!(7, record_reader.num_values());
         }
 
         // Second page
@@ -649,6 +670,7 @@ mod tests {
 
             assert_eq!(1, record_reader.read_records(10).unwrap());
             assert_eq!(4, record_reader.num_records());
+            assert_eq!(9, record_reader.num_values());
         }
 
         // Verify result record data
@@ -711,6 +733,7 @@ mod tests {
 
             assert_eq!(1000, record_reader.read_records(1000).unwrap());
             assert_eq!(1000, record_reader.num_records());
+            assert_eq!(5000, record_reader.num_values());
         }
     }
 }
diff --git a/rust/parquet/src/util/test_common/page_util.rs b/rust/parquet/src/util/test_common/page_util.rs
index 3f3cab7..d12b734 100644
--- a/rust/parquet/src/util/test_common/page_util.rs
+++ b/rust/parquet/src/util/test_common/page_util.rs
@@ -17,10 +17,12 @@
 
 use crate::basic::Encoding;
 use crate::column::page::Page;
+use crate::column::page::PageReader;
 use crate::data_type::DataType;
 use crate::encodings::encoding::{get_encoder, Encoder};
 use crate::encodings::levels::max_buffer_size;
 use crate::encodings::levels::LevelEncoder;
+use crate::errors::Result;
 use crate::schema::types::ColumnDescPtr;
 use crate::util::memory::ByteBufferPtr;
 use crate::util::memory::MemTracker;
@@ -155,3 +157,22 @@ impl DataPageBuilder for DataPageBuilderImpl {
         }
     }
 }
+
+/// A utility page reader which stores pages in memory.
+pub struct InMemoryPageReader {
+    pages: Box<Iterator<Item = Page>>,
+}
+
+impl InMemoryPageReader {
+    pub fn new(pages: Vec<Page>) -> Self {
+        Self {
+            pages: Box::new(pages.into_iter()),
+        }
+    }
+}
+
+impl PageReader for InMemoryPageReader {
+    fn get_next_page(&mut self) -> Result<Option<Page>> {
+        Ok(self.pages.next())
+    }
+}