You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/06/01 20:51:44 UTC

[GitHub] [arrow] nevi-me opened a new pull request #7319: [DRAFT] [Rust] Parquet Arrow writer with nested support

nevi-me opened a new pull request #7319:
URL: https://github.com/apache/arrow/pull/7319


   **Note**: I started making changes to #6785, and ended up deviating a lot. 
   ___
   
   This is a draft to implement an arrow writer for parquet. It supports the following (no complete test coverage yet):
   
   * writing primitives except for booleans and binary
   * nested structs
   * null values (via definition levels)
   
   It does not yet support:
   
   - [ ] Boolean arrays (have to be handled differently from numeric values)
   - [ ] Binary arrays
   - [ ] List arrays (still figuring out deeply-nested repetition levels)
   - [ ] Dictionary arrays
   - [ ] Union arrays (are they even possible?)
   
   I have only added a test by creating a nested schema, which I tested on pyarrow.
   
   ```jupyter
   # schema of test_complex.parquet
   
   a: int32 not null
   b: int32
   c: struct<d: double, e: struct<f: float>> not null
     child 0, d: double
     child 1, e: struct<f: float>
         child 0, f: float
   ```
   
   This PR potentially addresses:
   
   * https://issues.apache.org/jira/browse/ARROW-8289
   * https://issues.apache.org/jira/browse/ARROW-8423
   * https://issues.apache.org/jira/browse/ARROW-8424
   * https://issues.apache.org/jira/browse/ARROW-8425
   
   And I would like to propose either opening new JIRAs for the above incomplete items, or renaming the last 3 above.
   
   ___
   
   **Help Needed**
   
   I'm implementing the definition and repetition levels on first principle from an old Parquet blog post from the Twitter engineering blog. It's likely that I'm not getting some concepts correct, so I would appreciate help with:
   
   * Checking if my logic is correct
   * Guidance or suggestions on how to more efficiently extract levels from arrays
   * Adding tests - I suspect we might need a lot of tests, so far we only test writing 1 batch, so I don't know how paging would work when writing a large enough file
   
   I also don't know if the various encoding levels (dictionary, RLE, etc.) and compression levels are applied automagically, or if that'd be something we need to explicitly enable.
   
   CC @sunchao @sadikovi @andygrove @paddyhoran 
   
   Might be of interest to @mcassels @maxburke


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-670968900


   > I'm also unsure of how to test deeply nested arrays directly in the code (I had to use Spark because Arrow reader doesn't yet support that).
   
   In C++ I covered at least partial testing by testing rep and def level generation [directly](https://github.com/apache/arrow/blob/b0902ab32f26681c9e99a0b61a5ab5d6d03a20df/cpp/src/parquet/arrow/path_internal_test.cc)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maxburke commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
maxburke commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-668810183


   Small comment about the API, something I found necessary was being able to pass in props to the writer interface: https://github.com/urbanlogiq/arrow/commit/5e08cf655aa78536d1fe72dc1ef1a0dcb91ff442
   
   The idea here is so that we can set the created_by attributes and compression, as an example.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467499328



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that repreesnts definition and repetition levels.

Review comment:
       nit: typo




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467451701



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,

Review comment:
       Documented, and removed the rows field. @andygrove I think I copied it as is from your initial draft. Did you want to be able to check how many rows have been written? I think it ends up being the same as the batch's length

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,
+}
+
+impl ArrowWriter {
+    pub fn try_new(file: File, arrow_schema: &Schema) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?;
+        let props = Rc::new(WriterProperties::builder().build());
+        let file_writer = SerializedFileWriter::new(
+            file.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            rows: 0,
+        })
+    }
+
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        let mut row_group_writer = self.writer.next_row_group()?;
+        self.rows += unnest_arrays_to_leaves(
+            &mut row_group_writer,
+            batch.schema().fields(),
+            batch.columns(),
+            &vec![1i16; batch.num_rows()][..],
+            0,
+        )?;
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Write nested arrays by traversing into structs and lists until primitive
+/// arrays are found.
+fn unnest_arrays_to_leaves(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+    // The fields from the record batch or struct
+    fields: &Vec<Field>,
+    // The columns from record batch or struct, must have same length as fields
+    columns: &[array::ArrayRef],
+    // The parent mask, in the case of a struct, this represents which values
+    // of the struct are true (1) or false(0).
+    // This is useful to respect the definition level of structs where all values are null in a row
+    parent_mask: &[i16],
+    // The current level that is being read at
+    level: i16,
+) -> Result<i64> {
+    let mut rows_written = 0;
+    for (field, column) in fields.iter().zip(columns) {
+        match field.data_type() {
+            ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"),
+            ArrowDataType::FixedSizeList(_, _) => {
+                unimplemented!("fsl not yet implemented")
+            }
+            ArrowDataType::Struct(fields) => {
+                // fields in a struct should recursively be written out
+                let array = column
+                    .as_any()
+                    .downcast_ref::<array::StructArray>()
+                    .expect("Unable to get struct array");
+                let mut null_mask = Vec::with_capacity(array.len());
+                for i in 0..array.len() {
+                    null_mask.push(array.is_valid(i) as i16);
+                }
+                rows_written += unnest_arrays_to_leaves(
+                    row_group_writer,
+                    fields,
+                    &array.columns_ref()[..],
+                    &null_mask[..],
+                    // if the field is nullable, we have to increment level
+                    level + field.is_nullable() as i16,
+                )?;
+            }
+            ArrowDataType::Null => unimplemented!(),
+            ArrowDataType::Boolean
+            | ArrowDataType::Int8
+            | ArrowDataType::Int16
+            | ArrowDataType::Int32
+            | ArrowDataType::Int64
+            | ArrowDataType::UInt8
+            | ArrowDataType::UInt16
+            | ArrowDataType::UInt32
+            | ArrowDataType::UInt64
+            | ArrowDataType::Float16
+            | ArrowDataType::Float32
+            | ArrowDataType::Float64
+            | ArrowDataType::Timestamp(_, _)
+            | ArrowDataType::Date32(_)
+            | ArrowDataType::Date64(_)
+            | ArrowDataType::Time32(_)
+            | ArrowDataType::Time64(_)
+            | ArrowDataType::Duration(_)
+            | ArrowDataType::Interval(_)
+            | ArrowDataType::Binary
+            | ArrowDataType::FixedSizeBinary(_)
+            | ArrowDataType::Utf8 => {
+                let col_writer = row_group_writer.next_column()?;
+                if let Some(mut writer) = col_writer {
+                    // write_column
+                    rows_written +=
+                        write_column(&mut writer, column, level, parent_mask)? as i64;
+                    row_group_writer.close_column(writer)?;
+                } else {
+                    panic!("No writer found")
+                }
+            }
+            ArrowDataType::Union(_) => unimplemented!(),
+            ArrowDataType::Dictionary(_, _) => unimplemented!(),
+        }
+    }
+    Ok(rows_written)
+}
+
+/// Write column to writer
+fn write_column(
+    writer: &mut ColumnWriter,
+    column: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Result<usize> {
+    match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = array::Int32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Vec<i16> {
+    // convince the compiler that bounds are fine
+    let len = array.len();
+    assert_eq!(
+        len,
+        parent_levels.len(),
+        "Parent definition levels must equal array length"
+    );
+    let levels = (0..len)
+        .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index])

Review comment:
       I've rewritten this and fixed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-670899165


   @sunchao @andygrove (CC @wesm @kszucs @emkornfield) in the past few months we haven't had enough review bandwidth on Rust's Parquet implementation (mostly relying on Chao for non-trivial reviews), and given the amount of work needed for an Arrow writer + the interest so far (I think few people already using this fork), I'd like to propose:
   
   * We create a temporary branch in the apache/arrow repo, where the arrow writer can temporarily live
   * We can merge changes into the branch, esp if there aren't enough reviewers at the time
   * When we're close to a release, we merge what's on the temp branch into the branch that's currently called `master` but will be renamed soon 😉 
   
   ITO this PR, I think I've gotten arbitrary nesting covered, but there's a lot more work that we can now divide more easily so others can contribute better. I'm also unsure of how to test deeply nested arrays directly in the code (I had to use Spark because Arrow reader doesn't yet support that).
   
   I'll also bring this up in the mailing list for wider visibility


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467499425



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that repreesnts definition and repetition levels.
+/// Repetition levels are only populated if the parent or current leaf is repeated
+#[derive(Debug)]
+struct Levels {
+    definition: Vec<i16>,
+    repetition: Option<Vec<i16>>,
+}
+
+/// Compute nested levels of the Arrow array, recursing into lists and structs
+fn get_levels(
+    array: &arrow_array::ArrayRef,
+    level: i16,
+    parent_def_levels: &[i16],
+    parent_rep_levels: Option<&[i16]>,
+) -> Vec<Levels> {
+    match array.data_type() {
+        ArrowDataType::Null => unimplemented!(),
+        ArrowDataType::Boolean
+        | ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => vec![Levels {
+            definition: get_primitive_def_levels(array, parent_def_levels),
+            repetition: None,
+        }],
+        ArrowDataType::Binary => unimplemented!(),
+        ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+        ArrowDataType::LargeBinary => unimplemented!(),
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // a list can either be nested or flat. If it is flat, def and rep lengths will be the length of the list's items

Review comment:
       this isn't true.  you can have many "empty" list which can have no items.  each empty list adds a def and rep level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-670935213


   @nevi-me Sounds good to me. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] sunchao commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
sunchao commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-671069704


   +1 on the approach as well. Thanks @nevi-me for all the efforts!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-673589031


   Merged as https://github.com/apache/arrow/commit/80a9c027b7c356f25a4c22e71587936a54959db6, not sure why the merge tool didn't close the issue


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-640010989


   https://issues.apache.org/jira/browse/ARROW-8289


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] maxburke commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
maxburke commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-668815376


   Oh! One more :) This one uses the ParquetWriter trait instead of std::fs::File: https://github.com/urbanlogiq/arrow/commit/e15851e11cef942fcd3803cb80016c35e49dca1b
   
   (our primary usecase is we write parquet files to memory because they immediately get persisted to object storage like AWS S3 so we never deal with local files)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-670968236


   > @emkornfield not sure if I understand this part, I'll try create a nested batch with a few levels, and have one record have the top level be nested.
   
   There are two bugs in C++ (one with an open PR).  
   
   The first bug is if you have a schema like `nullable struct<list<nullable struct<nullable struct<int>>>` you need to include all null values from the leaf to the list.  The bug we had in C++ is we would only include the first level of nulls and drop the other ones (leading to inconsistent list size).
   
   The second bug we have with no PR.  if you have a schema `nullable struct<nullable int>` then the null validity buffer could look like `[null, null, null]` but the underlying int vector could have valid values `[1, 2, 3]`.  For the purposes of writing to parquet the values should all be considered null.  The only way to determine this is to re-walk the tree or use the already generated levels to generate a new bitmap for the leaf..
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7319: [DRAFT] [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-637100265


   <!--
     Licensed to the Apache Software Foundation (ASF) under one
     or more contributor license agreements.  See the NOTICE file
     distributed with this work for additional information
     regarding copyright ownership.  The ASF licenses this file
     to you under the Apache License, Version 2.0 (the
     "License"); you may not use this file except in compliance
     with the License.  You may obtain a copy of the License at
   
       http://www.apache.org/licenses/LICENSE-2.0
   
     Unless required by applicable law or agreed to in writing,
     software distributed under the License is distributed on an
     "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     KIND, either express or implied.  See the License for the
     specific language governing permissions and limitations
     under the License.
   -->
   
   Thanks for opening a pull request!
   
   Could you open an issue for this pull request on JIRA?
   https://issues.apache.org/jira/browse/ARROW
   
   Then could you also rename pull request title in the following format?
   
       ARROW-${JIRA_ID}: [${COMPONENT}] ${SUMMARY}
   
   See also:
   
     * [Other pull requests](https://github.com/apache/arrow/pulls/)
     * [Contribution Guidelines - How to contribute patches](https://arrow.apache.org/docs/developers/contributing.html#how-to-contribute-patches)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-667618090


   > IMO we can solve this in two steps: 1) have a top-level method to compute def/rep levels for all leaf arrays of a given an arrow array, and then 2) use column writer to write Parquet value/def/rep using the result from the previous step and the input leaf array.
   
   FWIW, this is the approach the C++ code takes (supports arbitrary nessting).  One thing to note which is currently a bug in C++ is once rep/def levels are computed for any anything 
    with deep nesting (any leaf column one or more direct struct/group ancestor), nullness should be determined rep/def-levels and not leaf-arrays (this is currently a bug in C++).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-670878573


   > ... One thing to note which is currently a bug in C++ is once rep/def levels are computed for any anything
   > with deep nesting (any leaf column one or more direct struct/group ancestor), nullness should be determined rep/def-levels and not leaf-arrays (this is currently a bug in C++).
   
   @emkornfield not sure if I understand this part, I'll try create a nested batch with a few levels, and have one record have the top level be nested. Would this cover the case above? I might also be limited by https://issues.apache.org/jira/browse/ARROW-5408 for now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467500252



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that repreesnts definition and repetition levels.
+/// Repetition levels are only populated if the parent or current leaf is repeated
+#[derive(Debug)]
+struct Levels {
+    definition: Vec<i16>,
+    repetition: Option<Vec<i16>>,
+}
+
+/// Compute nested levels of the Arrow array, recursing into lists and structs
+fn get_levels(
+    array: &arrow_array::ArrayRef,
+    level: i16,
+    parent_def_levels: &[i16],
+    parent_rep_levels: Option<&[i16]>,
+) -> Vec<Levels> {
+    match array.data_type() {
+        ArrowDataType::Null => unimplemented!(),
+        ArrowDataType::Boolean
+        | ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_) => vec![Levels {
+            definition: get_primitive_def_levels(array, parent_def_levels),
+            repetition: None,
+        }],
+        ArrowDataType::Binary => unimplemented!(),
+        ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+        ArrowDataType::LargeBinary => unimplemented!(),
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // a list can either be nested or flat. If it is flat, def and rep lengths will be the length of the list's items

Review comment:
       Correct, this comment is incorrect. I wrote it before I tried to write an empty list. I'll remove it.
   The `arrow_writer_list` test covers this scenario




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r467451835



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,
+}
+
+impl ArrowWriter {
+    pub fn try_new(file: File, arrow_schema: &Schema) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?;
+        let props = Rc::new(WriterProperties::builder().build());
+        let file_writer = SerializedFileWriter::new(
+            file.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            rows: 0,
+        })
+    }
+
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        let mut row_group_writer = self.writer.next_row_group()?;
+        self.rows += unnest_arrays_to_leaves(
+            &mut row_group_writer,
+            batch.schema().fields(),
+            batch.columns(),
+            &vec![1i16; batch.num_rows()][..],
+            0,
+        )?;
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Write nested arrays by traversing into structs and lists until primitive
+/// arrays are found.
+fn unnest_arrays_to_leaves(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+    // The fields from the record batch or struct
+    fields: &Vec<Field>,
+    // The columns from record batch or struct, must have same length as fields
+    columns: &[array::ArrayRef],
+    // The parent mask, in the case of a struct, this represents which values
+    // of the struct are true (1) or false(0).
+    // This is useful to respect the definition level of structs where all values are null in a row
+    parent_mask: &[i16],
+    // The current level that is being read at
+    level: i16,
+) -> Result<i64> {
+    let mut rows_written = 0;
+    for (field, column) in fields.iter().zip(columns) {
+        match field.data_type() {
+            ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"),
+            ArrowDataType::FixedSizeList(_, _) => {
+                unimplemented!("fsl not yet implemented")
+            }
+            ArrowDataType::Struct(fields) => {
+                // fields in a struct should recursively be written out
+                let array = column
+                    .as_any()
+                    .downcast_ref::<array::StructArray>()
+                    .expect("Unable to get struct array");
+                let mut null_mask = Vec::with_capacity(array.len());
+                for i in 0..array.len() {
+                    null_mask.push(array.is_valid(i) as i16);
+                }
+                rows_written += unnest_arrays_to_leaves(
+                    row_group_writer,
+                    fields,
+                    &array.columns_ref()[..],
+                    &null_mask[..],
+                    // if the field is nullable, we have to increment level
+                    level + field.is_nullable() as i16,
+                )?;
+            }
+            ArrowDataType::Null => unimplemented!(),
+            ArrowDataType::Boolean
+            | ArrowDataType::Int8
+            | ArrowDataType::Int16
+            | ArrowDataType::Int32
+            | ArrowDataType::Int64
+            | ArrowDataType::UInt8
+            | ArrowDataType::UInt16
+            | ArrowDataType::UInt32
+            | ArrowDataType::UInt64
+            | ArrowDataType::Float16
+            | ArrowDataType::Float32
+            | ArrowDataType::Float64
+            | ArrowDataType::Timestamp(_, _)
+            | ArrowDataType::Date32(_)
+            | ArrowDataType::Date64(_)
+            | ArrowDataType::Time32(_)
+            | ArrowDataType::Time64(_)
+            | ArrowDataType::Duration(_)
+            | ArrowDataType::Interval(_)
+            | ArrowDataType::Binary
+            | ArrowDataType::FixedSizeBinary(_)
+            | ArrowDataType::Utf8 => {
+                let col_writer = row_group_writer.next_column()?;
+                if let Some(mut writer) = col_writer {
+                    // write_column
+                    rows_written +=
+                        write_column(&mut writer, column, level, parent_mask)? as i64;
+                    row_group_writer.close_column(writer)?;
+                } else {
+                    panic!("No writer found")
+                }
+            }
+            ArrowDataType::Union(_) => unimplemented!(),
+            ArrowDataType::Dictionary(_, _) => unimplemented!(),
+        }
+    }
+    Ok(rows_written)
+}
+
+/// Write column to writer
+fn write_column(
+    writer: &mut ColumnWriter,
+    column: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Result<usize> {
+    match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = array::Int32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Vec<i16> {
+    // convince the compiler that bounds are fine
+    let len = array.len();
+    assert_eq!(
+        len,
+        parent_levels.len(),
+        "Parent definition levels must equal array length"
+    );
+    let levels = (0..len)
+        .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index])
+        .collect();
+    levels
+}
+
+/// Get the underlying numeric array slice, skipping any null values.
+/// If there are no null values, the entire slice is returned,
+/// thus this should only be called when there are null values.
+fn get_numeric_array_slice<T, A>(array: &array::PrimitiveArray<A>) -> Vec<T::T>
+where
+    T: DataType,
+    A: arrow::datatypes::ArrowNumericType,
+    T::T: From<A::Native>,
+{
+    let mut values = Vec::with_capacity(array.len() - array.null_count());

Review comment:
       Yes, I agree that it's better to avoid this function if there are no nulls, I'll look into this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] sunchao commented on a change in pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#discussion_r447898772



##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,
+}
+
+impl ArrowWriter {
+    pub fn try_new(file: File, arrow_schema: &Schema) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?;
+        let props = Rc::new(WriterProperties::builder().build());
+        let file_writer = SerializedFileWriter::new(
+            file.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            rows: 0,
+        })
+    }
+
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        let mut row_group_writer = self.writer.next_row_group()?;
+        self.rows += unnest_arrays_to_leaves(
+            &mut row_group_writer,
+            batch.schema().fields(),
+            batch.columns(),
+            &vec![1i16; batch.num_rows()][..],
+            0,
+        )?;
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Write nested arrays by traversing into structs and lists until primitive
+/// arrays are found.
+fn unnest_arrays_to_leaves(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+    // The fields from the record batch or struct
+    fields: &Vec<Field>,
+    // The columns from record batch or struct, must have same length as fields
+    columns: &[array::ArrayRef],
+    // The parent mask, in the case of a struct, this represents which values
+    // of the struct are true (1) or false(0).
+    // This is useful to respect the definition level of structs where all values are null in a row
+    parent_mask: &[i16],
+    // The current level that is being read at
+    level: i16,
+) -> Result<i64> {
+    let mut rows_written = 0;
+    for (field, column) in fields.iter().zip(columns) {
+        match field.data_type() {
+            ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"),
+            ArrowDataType::FixedSizeList(_, _) => {
+                unimplemented!("fsl not yet implemented")
+            }
+            ArrowDataType::Struct(fields) => {
+                // fields in a struct should recursively be written out
+                let array = column
+                    .as_any()
+                    .downcast_ref::<array::StructArray>()
+                    .expect("Unable to get struct array");
+                let mut null_mask = Vec::with_capacity(array.len());
+                for i in 0..array.len() {
+                    null_mask.push(array.is_valid(i) as i16);
+                }
+                rows_written += unnest_arrays_to_leaves(
+                    row_group_writer,
+                    fields,
+                    &array.columns_ref()[..],
+                    &null_mask[..],
+                    // if the field is nullable, we have to increment level
+                    level + field.is_nullable() as i16,
+                )?;
+            }
+            ArrowDataType::Null => unimplemented!(),
+            ArrowDataType::Boolean
+            | ArrowDataType::Int8
+            | ArrowDataType::Int16
+            | ArrowDataType::Int32
+            | ArrowDataType::Int64
+            | ArrowDataType::UInt8
+            | ArrowDataType::UInt16
+            | ArrowDataType::UInt32
+            | ArrowDataType::UInt64
+            | ArrowDataType::Float16
+            | ArrowDataType::Float32
+            | ArrowDataType::Float64
+            | ArrowDataType::Timestamp(_, _)
+            | ArrowDataType::Date32(_)
+            | ArrowDataType::Date64(_)
+            | ArrowDataType::Time32(_)
+            | ArrowDataType::Time64(_)
+            | ArrowDataType::Duration(_)
+            | ArrowDataType::Interval(_)
+            | ArrowDataType::Binary
+            | ArrowDataType::FixedSizeBinary(_)
+            | ArrowDataType::Utf8 => {
+                let col_writer = row_group_writer.next_column()?;
+                if let Some(mut writer) = col_writer {
+                    // write_column
+                    rows_written +=
+                        write_column(&mut writer, column, level, parent_mask)? as i64;
+                    row_group_writer.close_column(writer)?;
+                } else {
+                    panic!("No writer found")
+                }
+            }
+            ArrowDataType::Union(_) => unimplemented!(),
+            ArrowDataType::Dictionary(_, _) => unimplemented!(),
+        }
+    }
+    Ok(rows_written)
+}
+
+/// Write column to writer
+fn write_column(
+    writer: &mut ColumnWriter,
+    column: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Result<usize> {
+    match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = array::Int32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Vec<i16> {
+    // convince the compiler that bounds are fine
+    let len = array.len();
+    assert_eq!(
+        len,
+        parent_levels.len(),
+        "Parent definition levels must equal array length"
+    );
+    let levels = (0..len)
+        .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index])
+        .collect();
+    levels
+}
+
+/// Get the underlying numeric array slice, skipping any null values.
+/// If there are no null values, the entire slice is returned,
+/// thus this should only be called when there are null values.
+fn get_numeric_array_slice<T, A>(array: &array::PrimitiveArray<A>) -> Vec<T::T>
+where
+    T: DataType,
+    A: arrow::datatypes::ArrowNumericType,
+    T::T: From<A::Native>,
+{
+    let mut values = Vec::with_capacity(array.len() - array.null_count());

Review comment:
       Not sure if this is the best way to handle this. For one, we can skip the for-loop if all array elements are not-null. Also perhaps we should use array builder for this? Eventually we can have a `write_arrow_batch` method in `column/writer.rs`.

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,

Review comment:
       May add comments on what this is for

##########
File path: rust/parquet/src/arrow/arrow_writer.rs
##########
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::fs::File;
+use std::rc::Rc;
+
+use array::Array;
+use arrow::array;
+use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
+use arrow::record_batch::RecordBatch;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::Result;
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+struct ArrowWriter {
+    writer: SerializedFileWriter<File>,
+    rows: i64,
+}
+
+impl ArrowWriter {
+    pub fn try_new(file: File, arrow_schema: &Schema) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(arrow_schema)?;
+        let props = Rc::new(WriterProperties::builder().build());
+        let file_writer = SerializedFileWriter::new(
+            file.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            rows: 0,
+        })
+    }
+
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        let mut row_group_writer = self.writer.next_row_group()?;
+        self.rows += unnest_arrays_to_leaves(
+            &mut row_group_writer,
+            batch.schema().fields(),
+            batch.columns(),
+            &vec![1i16; batch.num_rows()][..],
+            0,
+        )?;
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Write nested arrays by traversing into structs and lists until primitive
+/// arrays are found.
+fn unnest_arrays_to_leaves(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+    // The fields from the record batch or struct
+    fields: &Vec<Field>,
+    // The columns from record batch or struct, must have same length as fields
+    columns: &[array::ArrayRef],
+    // The parent mask, in the case of a struct, this represents which values
+    // of the struct are true (1) or false(0).
+    // This is useful to respect the definition level of structs where all values are null in a row
+    parent_mask: &[i16],
+    // The current level that is being read at
+    level: i16,
+) -> Result<i64> {
+    let mut rows_written = 0;
+    for (field, column) in fields.iter().zip(columns) {
+        match field.data_type() {
+            ArrowDataType::List(_dtype) => unimplemented!("list not yet implemented"),
+            ArrowDataType::FixedSizeList(_, _) => {
+                unimplemented!("fsl not yet implemented")
+            }
+            ArrowDataType::Struct(fields) => {
+                // fields in a struct should recursively be written out
+                let array = column
+                    .as_any()
+                    .downcast_ref::<array::StructArray>()
+                    .expect("Unable to get struct array");
+                let mut null_mask = Vec::with_capacity(array.len());
+                for i in 0..array.len() {
+                    null_mask.push(array.is_valid(i) as i16);
+                }
+                rows_written += unnest_arrays_to_leaves(
+                    row_group_writer,
+                    fields,
+                    &array.columns_ref()[..],
+                    &null_mask[..],
+                    // if the field is nullable, we have to increment level
+                    level + field.is_nullable() as i16,
+                )?;
+            }
+            ArrowDataType::Null => unimplemented!(),
+            ArrowDataType::Boolean
+            | ArrowDataType::Int8
+            | ArrowDataType::Int16
+            | ArrowDataType::Int32
+            | ArrowDataType::Int64
+            | ArrowDataType::UInt8
+            | ArrowDataType::UInt16
+            | ArrowDataType::UInt32
+            | ArrowDataType::UInt64
+            | ArrowDataType::Float16
+            | ArrowDataType::Float32
+            | ArrowDataType::Float64
+            | ArrowDataType::Timestamp(_, _)
+            | ArrowDataType::Date32(_)
+            | ArrowDataType::Date64(_)
+            | ArrowDataType::Time32(_)
+            | ArrowDataType::Time64(_)
+            | ArrowDataType::Duration(_)
+            | ArrowDataType::Interval(_)
+            | ArrowDataType::Binary
+            | ArrowDataType::FixedSizeBinary(_)
+            | ArrowDataType::Utf8 => {
+                let col_writer = row_group_writer.next_column()?;
+                if let Some(mut writer) = col_writer {
+                    // write_column
+                    rows_written +=
+                        write_column(&mut writer, column, level, parent_mask)? as i64;
+                    row_group_writer.close_column(writer)?;
+                } else {
+                    panic!("No writer found")
+                }
+            }
+            ArrowDataType::Union(_) => unimplemented!(),
+            ArrowDataType::Dictionary(_, _) => unimplemented!(),
+        }
+    }
+    Ok(rows_written)
+}
+
+/// Write column to writer
+fn write_column(
+    writer: &mut ColumnWriter,
+    column: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Result<usize> {
+    match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = array::Int32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(get_primitive_def_levels(column, level, parent_levels).as_slice()),
+                None,
+            )
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &array::ArrayRef,
+    level: i16,
+    parent_levels: &[i16],
+) -> Vec<i16> {
+    // convince the compiler that bounds are fine
+    let len = array.len();
+    assert_eq!(
+        len,
+        parent_levels.len(),
+        "Parent definition levels must equal array length"
+    );
+    let levels = (0..len)
+        .map(|index| (array.is_valid(index) as i16 + level) * parent_levels[index])

Review comment:
       Hmm is this correct? if I have a deeply nested struct and if its parent is null, then its def level is 0?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me closed pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me closed pull request #7319:
URL: https://github.com/apache/arrow/pull/7319


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] nevi-me commented on pull request #7319: ARROW-8289: [Rust] Parquet Arrow writer with nested support

Posted by GitBox <gi...@apache.org>.
nevi-me commented on pull request #7319:
URL: https://github.com/apache/arrow/pull/7319#issuecomment-669128126


   @maxburke there's been some interest from other people on this PR. I haven't been able to continue working on it because where I have a bit of free time I've been looking at the IPC/integration issues (Rust doesn't work with 0.15+ files).
   
   Please feel free to push changes against this PR, or to open a PR against my fork with upstream changes. There's also someone who reached out to me on Twitter asking how they can continue with this. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org