You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/09/13 06:09:17 UTC

[arrow] 01/02: ARROW-8289: [Rust] Parquet Arrow writer with nested support

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 2b5e102964dd6641c5644c5c4a207d11574532fb
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Thu Aug 13 18:47:34 2020 +0200

    ARROW-8289: [Rust] Parquet Arrow writer with nested support
    
    **Note**: I started making changes to #6785, and ended up deviating a lot, so I opted for making a new draft PR in case my approach is not suitable.
    ___
    
    This is a draft to implement an arrow writer for parquet. It supports the following (no complete test coverage yet):
    
    * writing primitives except for booleans and binary
    * nested structs
    * null values (via definition levels)
    
    It does not yet support:
    
    - Boolean arrays (have to be handled differently from numeric values)
    - Binary arrays
    - Dictionary arrays
    - Union arrays (are they even possible?)
    
    I have only added a test by creating a nested schema, which I tested on pyarrow.
    
    ```jupyter
    # schema of test_complex.parquet
    
    a: int32 not null
    b: int32
    c: struct<d: double, e: struct<f: float>> not null
      child 0, d: double
      child 1, e: struct<f: float>
          child 0, f: float
    ```
    
    This PR potentially addresses:
    
    * https://issues.apache.org/jira/browse/ARROW-8289
    * https://issues.apache.org/jira/browse/ARROW-8423
    * https://issues.apache.org/jira/browse/ARROW-8424
    * https://issues.apache.org/jira/browse/ARROW-8425
    
    And I would like to propose either opening new JIRAs for the above incomplete items, or renaming the last 3 above.
    
    ___
    
    **Help Needed**
    
    I'm implementing the definition and repetition levels on first principle from an old Parquet blog post from the Twitter engineering blog. It's likely that I'm not getting some concepts correct, so I would appreciate help with:
    
    * Checking if my logic is correct
    * Guidance or suggestions on how to more efficiently extract levels from arrays
    * Adding tests - I suspect we might need a lot of tests, so far we only test writing 1 batch, so I don't know how paging would work when writing a large enough file
    
    I also don't know if the various encoding levels (dictionary, RLE, etc.) and compression levels are applied automagically, or if that'd be something we need to explicitly enable.
    
    CC @sunchao @sadikovi @andygrove @paddyhoran
    
    Might be of interest to @mcassels @maxburke
    
    Closes #7319 from nevi-me/arrow-parquet-writer
    
    Lead-authored-by: Neville Dipale <ne...@gmail.com>
    Co-authored-by: Max Burke <ma...@urbanlogiq.com>
    Co-authored-by: Andy Grove <an...@gmail.com>
    Co-authored-by: Max Burke <ma...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/src/arrow/arrow_writer.rs | 682 +++++++++++++++++++++++++++++++++
 rust/parquet/src/arrow/mod.rs          |   5 +-
 rust/parquet/src/schema/types.rs       |   6 +-
 3 files changed, 691 insertions(+), 2 deletions(-)

diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
new file mode 100644
index 0000000..0c1c490
--- /dev/null
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -0,0 +1,682 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Contains writer which writes arrow data into parquet data.
+
+use std::rc::Rc;
+
+use arrow::array as arrow_array;
+use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
+use arrow::record_batch::RecordBatch;
+use arrow_array::Array;
+
+use crate::column::writer::ColumnWriter;
+use crate::errors::{ParquetError, Result};
+use crate::file::properties::WriterProperties;
+use crate::{
+    data_type::*,
+    file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter},
+};
+
+/// Arrow writer
+///
+/// Writes Arrow `RecordBatch`es to a Parquet writer
+pub struct ArrowWriter<W: ParquetWriter> {
+    /// Underlying Parquet writer
+    writer: SerializedFileWriter<W>,
+    /// A copy of the Arrow schema.
+    ///
+    /// The schema is used to verify that each record batch written has the correct schema
+    arrow_schema: SchemaRef,
+}
+
+impl<W: 'static + ParquetWriter> ArrowWriter<W> {
+    /// Try to create a new Arrow writer
+    ///
+    /// The writer will fail if:
+    ///  * a `SerializedFileWriter` cannot be created from the ParquetWriter
+    ///  * the Arrow schema contains unsupported datatypes such as Unions
+    pub fn try_new(
+        writer: W,
+        arrow_schema: SchemaRef,
+        props: Option<Rc<WriterProperties>>,
+    ) -> Result<Self> {
+        let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
+        let props = match props {
+            Some(props) => props,
+            None => Rc::new(WriterProperties::builder().build()),
+        };
+        let file_writer = SerializedFileWriter::new(
+            writer.try_clone()?,
+            schema.root_schema_ptr(),
+            props,
+        )?;
+
+        Ok(Self {
+            writer: file_writer,
+            arrow_schema,
+        })
+    }
+
+    /// Write a RecordBatch to writer
+    ///
+    /// *NOTE:* The writer currently does not support all Arrow data types
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        // validate batch schema against writer's supplied schema
+        if self.arrow_schema != batch.schema() {
+            return Err(ParquetError::ArrowError(
+                "Record batch schema does not match writer schema".to_string(),
+            ));
+        }
+        // compute the definition and repetition levels of the batch
+        let mut levels = vec![];
+        batch.columns().iter().for_each(|array| {
+            let mut array_levels =
+                get_levels(array, 0, &vec![1i16; batch.num_rows()][..], None);
+            levels.append(&mut array_levels);
+        });
+        // reverse levels so we can use Vec::pop(&mut self)
+        levels.reverse();
+
+        let mut row_group_writer = self.writer.next_row_group()?;
+
+        // write leaves
+        for column in batch.columns() {
+            write_leaves(&mut row_group_writer, column, &mut levels)?;
+        }
+
+        self.writer.close_row_group(row_group_writer)
+    }
+
+    /// Close and finalise the underlying Parquet writer
+    pub fn close(&mut self) -> Result<()> {
+        self.writer.close()
+    }
+}
+
+/// Convenience method to get the next ColumnWriter from the RowGroupWriter
+#[inline]
+#[allow(clippy::borrowed_box)]
+fn get_col_writer(
+    row_group_writer: &mut Box<dyn RowGroupWriter>,
+) -> Result<ColumnWriter> {
+    let col_writer = row_group_writer
+        .next_column()?
+        .expect("Unable to get column writer");
+    Ok(col_writer)
+}
+
+#[allow(clippy::borrowed_box)]
+fn write_leaves(
+    mut row_group_writer: &mut Box<dyn RowGroupWriter>,
+    array: &arrow_array::ArrayRef,
+    mut levels: &mut Vec<Levels>,
+) -> Result<()> {
+    match array.data_type() {
+        ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_)
+        | ArrowDataType::LargeBinary
+        | ArrowDataType::Binary
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8 => {
+            let mut col_writer = get_col_writer(&mut row_group_writer)?;
+            write_leaf(
+                &mut col_writer,
+                array,
+                levels.pop().expect("Levels exhausted"),
+            )?;
+            row_group_writer.close_column(col_writer)?;
+            Ok(())
+        }
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            // write the child list
+            let data = array.data();
+            let child_array = arrow_array::make_array(data.child_data()[0].clone());
+            write_leaves(&mut row_group_writer, &child_array, &mut levels)?;
+            Ok(())
+        }
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            for field in struct_array.columns() {
+                write_leaves(&mut row_group_writer, field, &mut levels)?;
+            }
+            Ok(())
+        }
+        ArrowDataType::FixedSizeList(_, _)
+        | ArrowDataType::Null
+        | ArrowDataType::Boolean
+        | ArrowDataType::FixedSizeBinary(_)
+        | ArrowDataType::Union(_)
+        | ArrowDataType::Dictionary(_, _) => Err(ParquetError::NYI(
+            "Attempting to write an Arrow type that is not yet implemented".to_string(),
+        )),
+    }
+}
+
+fn write_leaf(
+    writer: &mut ColumnWriter,
+    column: &arrow_array::ArrayRef,
+    levels: Levels,
+) -> Result<i64> {
+    let written = match writer {
+        ColumnWriter::Int32ColumnWriter(ref mut typed) => {
+            let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
+            let array = array
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .expect("Unable to get int32 array");
+            typed.write_batch(
+                get_numeric_array_slice::<Int32Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::BoolColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::Int64ColumnWriter(ref mut typed) => {
+            let array = arrow_array::Int64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<Int64Type, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+        ColumnWriter::FloatColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float32Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<FloatType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::DoubleColumnWriter(ref mut typed) => {
+            let array = arrow_array::Float64Array::from(column.data());
+            typed.write_batch(
+                get_numeric_array_slice::<DoubleType, _>(&array).as_slice(),
+                Some(levels.definition.as_slice()),
+                levels.repetition.as_deref(),
+            )?
+        }
+        ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
+            ArrowDataType::Binary | ArrowDataType::Utf8 => {
+                let array = arrow_array::BinaryArray::from(column.data());
+                typed.write_batch(
+                    get_binary_array(&array).as_slice(),
+                    Some(levels.definition.as_slice()),
+                    levels.repetition.as_deref(),
+                )?
+            }
+            ArrowDataType::LargeBinary | ArrowDataType::LargeUtf8 => {
+                let array = arrow_array::LargeBinaryArray::from(column.data());
+                typed.write_batch(
+                    get_large_binary_array(&array).as_slice(),
+                    Some(levels.definition.as_slice()),
+                    levels.repetition.as_deref(),
+                )?
+            }
+            _ => unreachable!("Currently unreachable because data type not supported"),
+        },
+        ColumnWriter::FixedLenByteArrayColumnWriter(ref mut _typed) => {
+            unreachable!("Currently unreachable because data type not supported")
+        }
+    };
+    Ok(written as i64)
+}
+
+/// A struct that represents definition and repetition levels.
+/// Repetition levels are only populated if the parent or current leaf is repeated
+#[derive(Debug)]
+struct Levels {
+    definition: Vec<i16>,
+    repetition: Option<Vec<i16>>,
+}
+
+/// Compute nested levels of the Arrow array, recursing into lists and structs
+fn get_levels(
+    array: &arrow_array::ArrayRef,
+    level: i16,
+    parent_def_levels: &[i16],
+    parent_rep_levels: Option<&[i16]>,
+) -> Vec<Levels> {
+    match array.data_type() {
+        ArrowDataType::Null => unimplemented!(),
+        ArrowDataType::Boolean
+        | ArrowDataType::Int8
+        | ArrowDataType::Int16
+        | ArrowDataType::Int32
+        | ArrowDataType::Int64
+        | ArrowDataType::UInt8
+        | ArrowDataType::UInt16
+        | ArrowDataType::UInt32
+        | ArrowDataType::UInt64
+        | ArrowDataType::Float16
+        | ArrowDataType::Float32
+        | ArrowDataType::Float64
+        | ArrowDataType::Utf8
+        | ArrowDataType::LargeUtf8
+        | ArrowDataType::Timestamp(_, _)
+        | ArrowDataType::Date32(_)
+        | ArrowDataType::Date64(_)
+        | ArrowDataType::Time32(_)
+        | ArrowDataType::Time64(_)
+        | ArrowDataType::Duration(_)
+        | ArrowDataType::Interval(_)
+        | ArrowDataType::Binary
+        | ArrowDataType::LargeBinary => vec![Levels {
+            definition: get_primitive_def_levels(array, parent_def_levels),
+            repetition: None,
+        }],
+        ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+        ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+            let array_data = array.data();
+            let child_data = array_data.child_data().get(0).unwrap();
+            // get offsets, accounting for large offsets if present
+            let offsets: Vec<i64> = {
+                if let ArrowDataType::LargeList(_) = array.data_type() {
+                    unsafe { array_data.buffers()[0].typed_data::<i64>() }.to_vec()
+                } else {
+                    let offsets = unsafe { array_data.buffers()[0].typed_data::<i32>() };
+                    offsets.to_vec().into_iter().map(|v| v as i64).collect()
+                }
+            };
+            let child_array = arrow_array::make_array(child_data.clone());
+
+            let mut list_def_levels = Vec::with_capacity(child_array.len());
+            let mut list_rep_levels = Vec::with_capacity(child_array.len());
+            let rep_levels: Vec<i16> = parent_rep_levels
+                .map(|l| l.to_vec())
+                .unwrap_or_else(|| vec![0i16; parent_def_levels.len()]);
+            parent_def_levels
+                .iter()
+                .zip(rep_levels)
+                .zip(offsets.windows(2))
+                .for_each(|((parent_def_level, parent_rep_level), window)| {
+                    if *parent_def_level == 0 {
+                        // parent is null, list element must also be null
+                        list_def_levels.push(0);
+                        list_rep_levels.push(0);
+                    } else {
+                        // parent is not null, check if list is empty or null
+                        let start = window[0];
+                        let end = window[1];
+                        let len = end - start;
+                        if len == 0 {
+                            list_def_levels.push(*parent_def_level - 1);
+                            list_rep_levels.push(parent_rep_level);
+                        } else {
+                            list_def_levels.push(*parent_def_level);
+                            list_rep_levels.push(parent_rep_level);
+                            for _ in 1..len {
+                                list_def_levels.push(*parent_def_level);
+                                list_rep_levels.push(parent_rep_level + 1);
+                            }
+                        }
+                    }
+                });
+
+            // if datatype is a primitive, we can construct levels of the child array
+            match child_array.data_type() {
+                ArrowDataType::Null => unimplemented!(),
+                ArrowDataType::Boolean => unimplemented!(),
+                ArrowDataType::Int8
+                | ArrowDataType::Int16
+                | ArrowDataType::Int32
+                | ArrowDataType::Int64
+                | ArrowDataType::UInt8
+                | ArrowDataType::UInt16
+                | ArrowDataType::UInt32
+                | ArrowDataType::UInt64
+                | ArrowDataType::Float16
+                | ArrowDataType::Float32
+                | ArrowDataType::Float64
+                | ArrowDataType::Timestamp(_, _)
+                | ArrowDataType::Date32(_)
+                | ArrowDataType::Date64(_)
+                | ArrowDataType::Time32(_)
+                | ArrowDataType::Time64(_)
+                | ArrowDataType::Duration(_)
+                | ArrowDataType::Interval(_) => {
+                    let def_levels =
+                        get_primitive_def_levels(&child_array, &list_def_levels[..]);
+                    vec![Levels {
+                        definition: def_levels,
+                        repetition: Some(list_rep_levels),
+                    }]
+                }
+                ArrowDataType::Binary
+                | ArrowDataType::Utf8
+                | ArrowDataType::LargeUtf8 => unimplemented!(),
+                ArrowDataType::FixedSizeBinary(_) => unimplemented!(),
+                ArrowDataType::LargeBinary => unimplemented!(),
+                ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
+                    // nested list
+                    unimplemented!()
+                }
+                ArrowDataType::FixedSizeList(_, _) => unimplemented!(),
+                ArrowDataType::Struct(_) => get_levels(
+                    array,
+                    level + 1, // indicates a nesting level of 2 (list + struct)
+                    &list_def_levels[..],
+                    Some(&list_rep_levels[..]),
+                ),
+                ArrowDataType::Union(_) => unimplemented!(),
+                ArrowDataType::Dictionary(_, _) => unimplemented!(),
+            }
+        }
+        ArrowDataType::FixedSizeList(_, _) => unimplemented!(),
+        ArrowDataType::Struct(_) => {
+            let struct_array: &arrow_array::StructArray = array
+                .as_any()
+                .downcast_ref::<arrow_array::StructArray>()
+                .expect("Unable to get struct array");
+            let mut struct_def_levels = Vec::with_capacity(struct_array.len());
+            for i in 0..array.len() {
+                struct_def_levels.push(level + struct_array.is_valid(i) as i16);
+            }
+            // trying to create levels for struct's fields
+            let mut struct_levels = vec![];
+            struct_array.columns().into_iter().for_each(|col| {
+                let mut levels =
+                    get_levels(col, level + 1, &struct_def_levels[..], parent_rep_levels);
+                struct_levels.append(&mut levels);
+            });
+            struct_levels
+        }
+        ArrowDataType::Union(_) => unimplemented!(),
+        ArrowDataType::Dictionary(_, _) => unimplemented!(),
+    }
+}
+
+/// Get the definition levels of the numeric array, with level 0 being null and 1 being not null
+/// In the case where the array in question is a child of either a list or struct, the levels
+/// are incremented in accordance with the `level` parameter.
+/// Parent levels are either 0 or 1, and are used to higher (correct terminology?) leaves as null
+fn get_primitive_def_levels(
+    array: &arrow_array::ArrayRef,
+    parent_def_levels: &[i16],
+) -> Vec<i16> {
+    let mut array_index = 0;
+    let max_def_level = parent_def_levels.iter().max().unwrap();
+    let mut primitive_def_levels = vec![];
+    parent_def_levels.iter().for_each(|def_level| {
+        if def_level < max_def_level {
+            primitive_def_levels.push(*def_level);
+        } else {
+            primitive_def_levels.push(def_level - array.is_null(array_index) as i16);
+            array_index += 1;
+        }
+    });
+    primitive_def_levels
+}
+
+macro_rules! def_get_binary_array_fn {
+    ($name:ident, $ty:ty) => {
+        fn $name(array: &$ty) -> Vec<ByteArray> {
+            let mut values = Vec::with_capacity(array.len() - array.null_count());
+            for i in 0..array.len() {
+                if array.is_valid(i) {
+                    let bytes = ByteArray::from(array.value(i).to_vec());
+                    values.push(bytes);
+                }
+            }
+            values
+        }
+    };
+}
+
+def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray);
+def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray);
+
+/// Get the underlying numeric array slice, skipping any null values.
+/// If there are no null values, it might be quicker to get the slice directly instead of
+/// calling this function.
+fn get_numeric_array_slice<T, A>(array: &arrow_array::PrimitiveArray<A>) -> Vec<T::T>
+where
+    T: DataType,
+    A: arrow::datatypes::ArrowNumericType,
+    T::T: From<A::Native>,
+{
+    let mut values = Vec::with_capacity(array.len() - array.null_count());
+    for i in 0..array.len() {
+        if array.is_valid(i) {
+            values.push(array.value(i).into())
+        }
+    }
+    values
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use std::io::Seek;
+    use std::sync::Arc;
+
+    use arrow::array::*;
+    use arrow::datatypes::ToByteSlice;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::{RecordBatch, RecordBatchReader};
+
+    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use crate::file::reader::SerializedFileReader;
+    use crate::util::test_common::get_temp_file;
+
+    #[test]
+    fn arrow_writer() {
+        // define schema
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, true),
+        ]);
+
+        // create some data
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(a), Arc::new(b)],
+        )
+        .unwrap();
+
+        let file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+
+    #[test]
+    fn arrow_writer_list() {
+        // define schema
+        let schema = Schema::new(vec![Field::new(
+            "a",
+            DataType::List(Box::new(DataType::Int32)),
+            false,
+        )]);
+
+        // create some data
+        let a_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[false], [true, false], null, [true, false, true], [false, true, false, true]]
+        let a_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+        // Construct a list array from the above two
+        let a_list_data = ArrayData::builder(DataType::List(Box::new(DataType::Int32)))
+            .len(5)
+            .add_buffer(a_value_offsets)
+            .add_child_data(a_values.data())
+            .build();
+        let a = ListArray::from(a_list_data);
+
+        // build a record batch
+        let batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(a)]).unwrap();
+
+        let file = get_temp_file("test_arrow_writer_list.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+
+    #[test]
+    fn arrow_writer_binary() {
+        let string_field = Field::new("a", DataType::Utf8, false);
+        let binary_field = Field::new("b", DataType::Binary, false);
+        let schema = Schema::new(vec![string_field, binary_field]);
+
+        let raw_string_values = vec!["foo", "bar", "baz", "quux"];
+        let raw_binary_values = vec![
+            b"foo".to_vec(),
+            b"bar".to_vec(),
+            b"baz".to_vec(),
+            b"quux".to_vec(),
+        ];
+        let raw_binary_value_refs = raw_binary_values
+            .iter()
+            .map(|x| x.as_slice())
+            .collect::<Vec<_>>();
+
+        let string_values = StringArray::from(raw_string_values.clone());
+        let binary_values = BinaryArray::from(raw_binary_value_refs);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(string_values), Arc::new(binary_values)],
+        )
+        .unwrap();
+
+        let mut file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut writer =
+            ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
+                .unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        file.seek(std::io::SeekFrom::Start(0)).unwrap();
+        let file_reader = SerializedFileReader::new(file).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(file_reader));
+        let mut record_batch_reader = arrow_reader.get_record_reader(1024).unwrap();
+
+        let batch = record_batch_reader.next_batch().unwrap().unwrap();
+        let string_col = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let binary_col = batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<BinaryArray>()
+            .unwrap();
+
+        for i in 0..batch.num_rows() {
+            assert_eq!(string_col.value(i), raw_string_values[i]);
+            assert_eq!(binary_col.value(i), raw_binary_values[i].as_slice());
+        }
+    }
+
+    #[test]
+    fn arrow_writer_complex() {
+        // define schema
+        let struct_field_d = Field::new("d", DataType::Float64, true);
+        let struct_field_f = Field::new("f", DataType::Float32, true);
+        let struct_field_g =
+            Field::new("g", DataType::List(Box::new(DataType::Int16)), false);
+        let struct_field_e = Field::new(
+            "e",
+            DataType::Struct(vec![struct_field_f.clone(), struct_field_g.clone()]),
+            true,
+        );
+        let schema = Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, true),
+            Field::new(
+                "c",
+                DataType::Struct(vec![struct_field_d.clone(), struct_field_e.clone()]),
+                false,
+            ),
+        ]);
+
+        // create some data
+        let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
+        let b = Int32Array::from(vec![Some(1), None, None, Some(4), Some(5)]);
+        let d = Float64Array::from(vec![None, None, None, Some(1.0), None]);
+        let f = Float32Array::from(vec![Some(0.0), None, Some(333.3), None, Some(5.25)]);
+
+        let g_value = Int16Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+
+        // Construct a buffer for value offsets, for the nested array:
+        //  [[1], [2, 3], null, [4, 5, 6], [7, 8, 9, 10]]
+        let g_value_offsets =
+            arrow::buffer::Buffer::from(&[0, 1, 3, 3, 6, 10].to_byte_slice());
+
+        // Construct a list array from the above two
+        let g_list_data = ArrayData::builder(struct_field_g.data_type().clone())
+            .len(5)
+            .add_buffer(g_value_offsets)
+            .add_child_data(g_value.data())
+            .build();
+        let g = ListArray::from(g_list_data);
+
+        let e = StructArray::from(vec![
+            (struct_field_f, Arc::new(f) as ArrayRef),
+            (struct_field_g, Arc::new(g) as ArrayRef),
+        ]);
+
+        let c = StructArray::from(vec![
+            (struct_field_d, Arc::new(d) as ArrayRef),
+            (struct_field_e, Arc::new(e) as ArrayRef),
+        ]);
+
+        // build a record batch
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(a), Arc::new(b), Arc::new(c)],
+        )
+        .unwrap();
+
+        let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+    }
+}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index 02f50fd..c8739c2 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -51,10 +51,13 @@
 
 pub(in crate::arrow) mod array_reader;
 pub mod arrow_reader;
+pub mod arrow_writer;
 pub(in crate::arrow) mod converter;
 pub(in crate::arrow) mod record_reader;
 pub mod schema;
 
 pub use self::arrow_reader::ArrowReader;
 pub use self::arrow_reader::ParquetFileArrowReader;
-pub use self::schema::{parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns};
+pub use self::schema::{
+    arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
+};
diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs
index 416073a..5799905 100644
--- a/rust/parquet/src/schema/types.rs
+++ b/rust/parquet/src/schema/types.rs
@@ -788,7 +788,7 @@ impl SchemaDescriptor {
         result.clone()
     }
 
-    fn column_root_of(&self, i: usize) -> &Rc<Type> {
+    fn column_root_of(&self, i: usize) -> &TypePtr {
         assert!(
             i < self.leaves.len(),
             "Index out of bound: {} not in [0, {})",
@@ -810,6 +810,10 @@ impl SchemaDescriptor {
         self.schema.as_ref()
     }
 
+    pub fn root_schema_ptr(&self) -> TypePtr {
+        self.schema.clone()
+    }
+
     /// Returns schema name.
     pub fn name(&self) -> &str {
         self.schema.name()