You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/02/07 11:56:36 UTC

[arrow] branch master updated: ARROW-11308: [Rust][Parquet] Support decimal when writing parquet files

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 39dfd62  ARROW-11308: [Rust][Parquet] Support decimal when writing parquet files
39dfd62 is described below

commit 39dfd626cdcafdf37c22d81f669be41d5e408ef1
Author: Florian Müller <fl...@tomueller.de>
AuthorDate: Sun Feb 7 06:54:57 2021 -0500

    ARROW-11308: [Rust][Parquet] Support decimal when writing parquet files
    
    Allows writing DecimalArray to parquet files.
    
    The `i128` values of `DecimalArray` are written as big endian bytes with a fixed size, calculated from the precision. This assumes that precision defines the total length and scale is contained in the precision.
    
    Closes #9397 from sweb/ARROW-11308/parquet-decimal-writer
    
    Authored-by: Florian Müller <fl...@tomueller.de>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/parquet/src/arrow/arrow_writer.rs | 72 ++++++++++++++++++++++++++++++++--
 rust/parquet/src/arrow/levels.rs       |  8 ++--
 rust/parquet/src/arrow/schema.rs       | 20 ++++++----
 3 files changed, 86 insertions(+), 14 deletions(-)

diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
index d1f84d8..298ab34 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -25,7 +25,9 @@ use arrow::record_batch::RecordBatch;
 use arrow_array::Array;
 
 use super::levels::LevelInfo;
-use super::schema::add_encoded_arrow_schema_to_metadata;
+use super::schema::{
+    add_encoded_arrow_schema_to_metadata, decimal_length_from_precision,
+};
 
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
@@ -143,7 +145,8 @@ fn write_leaves(
         | ArrowDataType::LargeBinary
         | ArrowDataType::Binary
         | ArrowDataType::Utf8
-        | ArrowDataType::LargeUtf8 => {
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Decimal(_, _) => {
             let mut col_writer = get_col_writer(&mut row_group_writer)?;
             write_leaf(
                 &mut col_writer,
@@ -188,7 +191,6 @@ fn write_leaves(
         )),
         ArrowDataType::FixedSizeList(_, _)
         | ArrowDataType::FixedSizeBinary(_)
-        | ArrowDataType::Decimal(_, _)
         | ArrowDataType::Union(_) => Err(ParquetError::NYI(
             "Attempting to write an Arrow type that is not yet implemented".to_string(),
         )),
@@ -315,6 +317,13 @@ fn write_leaf(
                         .unwrap();
                     get_fsb_array_slice(&array, &indices)
                 }
+                ArrowDataType::Decimal(_, _) => {
+                    let array = column
+                        .as_any()
+                        .downcast_ref::<arrow_array::DecimalArray>()
+                        .unwrap();
+                    get_decimal_array_slice(&array, &indices)
+                }
                 _ => {
                     return Err(ParquetError::NYI(
                         "Attempting to write an Arrow type that is not yet implemented"
@@ -416,6 +425,20 @@ fn get_interval_dt_array_slice(
     values
 }
 
+fn get_decimal_array_slice(
+    array: &arrow_array::DecimalArray,
+    indices: &[usize],
+) -> Vec<FixedLenByteArray> {
+    let mut values = Vec::with_capacity(indices.len());
+    let size = decimal_length_from_precision(array.precision());
+    for i in indices {
+        let as_be_bytes = array.value(*i).to_be_bytes();
+        let resized_value = as_be_bytes[(16 - size)..].to_vec();
+        values.push(FixedLenByteArray::from(ByteArray::from(resized_value)));
+    }
+    values
+}
+
 fn get_fsb_array_slice(
     array: &arrow_array::FixedSizeBinaryArray,
     indices: &[usize],
@@ -634,6 +657,49 @@ mod tests {
     }
 
     #[test]
+    fn arrow_writer_decimal() {
+        let decimal_field = Field::new("a", DataType::Decimal(5, 2), false);
+        let schema = Schema::new(vec![decimal_field]);
+
+        let mut dec_builder = DecimalBuilder::new(4, 5, 2);
+        dec_builder.append_value(10_000).unwrap();
+        dec_builder.append_value(50_000).unwrap();
+        dec_builder.append_value(0).unwrap();
+        dec_builder.append_value(-100).unwrap();
+
+        let raw_decimal_i128_values: Vec<i128> = vec![10_000, 50_000, 0, -100];
+        let decimal_values = dec_builder.finish();
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(decimal_values)],
+        )
+        .unwrap();
+
+        let mut file = get_temp_file("test_arrow_writer_decimal.parquet", &[]);
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
+                .unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        file.seek(std::io::SeekFrom::Start(0)).unwrap();
+        let file_reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let batch = record_batch_reader.next().unwrap().unwrap();
+        let decimal_col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<DecimalArray>()
+            .unwrap();
+
+        for i in 0..batch.num_rows() {
+            assert_eq!(decimal_col.value(i), raw_decimal_i128_values[i]);
+        }
+    }
+
+    #[test]
     #[ignore = "See ARROW-11294, data is correct but list field name is incorrect"]
     fn arrow_writer_complex() {
         // define schema
diff --git a/rust/parquet/src/arrow/levels.rs b/rust/parquet/src/arrow/levels.rs
index 7a26b05..4ea1811 100644
--- a/rust/parquet/src/arrow/levels.rs
+++ b/rust/parquet/src/arrow/levels.rs
@@ -135,7 +135,8 @@ impl LevelInfo {
             | DataType::Duration(_)
             | DataType::Interval(_)
             | DataType::Binary
-            | DataType::LargeBinary => {
+            | DataType::LargeBinary
+            | DataType::Decimal(_, _) => {
                 // we return a vector of 1 value to represent the primitive
                 vec![self.calculate_child_levels(
                     array_offsets,
@@ -145,7 +146,6 @@ impl LevelInfo {
                 )]
             }
             DataType::FixedSizeBinary(_) => unimplemented!(),
-            DataType::Decimal(_, _) => unimplemented!(),
             DataType::List(list_field) | DataType::LargeList(list_field) => {
                 // Calculate the list level
                 let list_level = self.calculate_child_levels(
@@ -188,7 +188,8 @@ impl LevelInfo {
                     | DataType::LargeBinary
                     | DataType::Utf8
                     | DataType::LargeUtf8
-                    | DataType::Dictionary(_, _) => {
+                    | DataType::Dictionary(_, _)
+                    | DataType::Decimal(_, _) => {
                         vec![list_level.calculate_child_levels(
                             child_offsets,
                             child_mask,
@@ -197,7 +198,6 @@ impl LevelInfo {
                         )]
                     }
                     DataType::FixedSizeBinary(_) => unimplemented!(),
-                    DataType::Decimal(_, _) => unimplemented!(),
                     DataType::List(_) | DataType::LargeList(_) | DataType::Struct(_) => {
                         list_level.calculate_array_levels(&child_array, list_field)
                     }
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index 97e04c5..fa973b5 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -306,6 +306,10 @@ pub fn parquet_to_arrow_field(parquet_column: &ColumnDescriptor) -> Result<Field
         .map(|opt| opt.unwrap())
 }
 
+pub fn decimal_length_from_precision(precision: usize) -> usize {
+    (10.0_f64.powi(precision as i32).log2() / 8.0).ceil() as usize
+}
+
 /// Convert an arrow field to a parquet `Type`
 fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
     let name = field.name().as_str();
@@ -409,13 +413,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
                 .with_length(*length)
                 .build()
         }
-        DataType::Decimal(precision, _) => Type::primitive_type_builder(
-            name,
-            PhysicalType::FIXED_LEN_BYTE_ARRAY,
-        )
-        .with_repetition(repetition)
-        .with_length((10.0_f64.powi(*precision as i32).log2() / 8.0).ceil() as i32)
-        .build(),
+        DataType::Decimal(precision, scale) => {
+            Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
+                .with_repetition(repetition)
+                .with_length(decimal_length_from_precision(*precision) as i32)
+                .with_logical_type(LogicalType::DECIMAL)
+                .with_precision(*precision as i32)
+                .with_scale(*scale as i32)
+                .build()
+        }
         DataType::Utf8 | DataType::LargeUtf8 => {
             Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
                 .with_logical_type(LogicalType::UTF8)