You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/08 20:02:10 UTC

[GitHub] [arrow] emkornfield commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

emkornfield commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467499328



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that repreesnts definition and repetition levels.

Review comment:
       nit: typo




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org