You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2019/12/29 21:16:35 UTC

[arrow] 03/03: ARROW-5182: [Rust] Arrow IPC file writer

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch ARROW-5182
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit b42fa717182616d346a90dd2cd8202e60415e782
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Sun Dec 29 23:14:03 2019 +0200

    ARROW-5182: [Rust] Arrow IPC file writer
---
 rust/arrow/src/ipc/convert.rs | 218 +++++++++++++++++---
 rust/arrow/src/ipc/mod.rs     |   3 +
 rust/arrow/src/ipc/reader.rs  |  11 +-
 rust/arrow/src/ipc/writer.rs  | 461 ++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 657 insertions(+), 36 deletions(-)

diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs
index e2416ce..e3d70d9 100644
--- a/rust/arrow/src/ipc/convert.rs
+++ b/rust/arrow/src/ipc/convert.rs
@@ -26,8 +26,10 @@ use flatbuffers::{
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use DataType::*;
+
 /// Serialize a schema in IPC format
-fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
+pub(crate) fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
     let mut fbb = FlatBufferBuilder::new();
 
     let mut fields = vec![];
@@ -73,6 +75,47 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
     fbb
 }
 
+pub(crate) fn schema_to_fb_offset<'a: 'b, 'b>(
+    mut fbb: &'a mut FlatBufferBuilder,
+    schema: &Schema,
+) -> WIPOffset<ipc::Schema<'b>> {
+    let mut fields = vec![];
+    for field in schema.fields() {
+        let fb_field_name = fbb.create_string(field.name().as_str());
+        let (ipc_type_type, ipc_type, ipc_children) =
+            get_fb_field_type(field.data_type(), &mut fbb);
+        let mut field_builder = ipc::FieldBuilder::new(&mut fbb);
+        field_builder.add_name(fb_field_name);
+        field_builder.add_type_type(ipc_type_type);
+        field_builder.add_nullable(field.is_nullable());
+        match ipc_children {
+            None => {}
+            Some(children) => field_builder.add_children(children),
+        };
+        field_builder.add_type_(ipc_type);
+        fields.push(field_builder.finish());
+    }
+
+    let mut custom_metadata = vec![];
+    for (k, v) in schema.metadata() {
+        let fb_key_name = fbb.create_string(k.as_str());
+        let fb_val_name = fbb.create_string(v.as_str());
+
+        let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb);
+        kv_builder.add_key(fb_key_name);
+        kv_builder.add_value(fb_val_name);
+        custom_metadata.push(kv_builder.finish());
+    }
+
+    let fb_field_list = fbb.create_vector(&fields);
+    let fb_metadata_list = fbb.create_vector(&custom_metadata);
+
+    let mut builder = ipc::SchemaBuilder::new(&mut fbb);
+    builder.add_fields(fb_field_list);
+    builder.add_custom_metadata(fb_metadata_list);
+    builder.finish()
+}
+
 /// Convert an IPC Field to Arrow Field
 impl<'a> From<ipc::Field<'a>> for Field {
     fn from(field: ipc::Field) -> Field {
@@ -85,7 +128,7 @@ impl<'a> From<ipc::Field<'a>> for Field {
 }
 
 /// Deserialize a Schema table from IPC format to Schema data type
-pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
+pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema {
     let mut fields: Vec<Field> = vec![];
     let c_fields = fb.fields().unwrap();
     let len = c_fields.len();
@@ -110,7 +153,7 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
 }
 
 /// Get the Arrow data type from the flatbuffer Field table
-fn get_data_type(field: ipc::Field) -> DataType {
+pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
     match field.type_type() {
         ipc::Type::Bool => DataType::Boolean,
         ipc::Type::Int => {
@@ -236,7 +279,7 @@ fn get_data_type(field: ipc::Field) -> DataType {
 }
 
 /// Get the IPC type of a data type
-fn get_fb_field_type<'a: 'b, 'b>(
+pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
     data_type: &DataType,
     mut fbb: &mut FlatBufferBuilder<'a>,
 ) -> (
@@ -244,14 +287,20 @@ fn get_fb_field_type<'a: 'b, 'b>(
     WIPOffset<UnionWIPOffset>,
     Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>,
 ) {
-    use DataType::*;
+    // some IPC implementations expect an empty list for child data, instead of a null value.
+    // An empty field list is thus returned for primitive types
+    let empty_fields: Vec<WIPOffset<ipc::Field>> = vec![];
     match data_type {
-        Boolean => (
-            ipc::Type::Bool,
-            ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(),
-            None,
-        ),
+        Boolean => {
+            let children = fbb.create_vector(&empty_fields[..]);
+            (
+                ipc::Type::Bool,
+                ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(),
+                Some(children),
+            )
+        }
         UInt8 | UInt16 | UInt32 | UInt64 => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::IntBuilder::new(&mut fbb);
             builder.add_is_signed(false);
             match data_type {
@@ -261,9 +310,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
                 UInt64 => builder.add_bitWidth(64),
                 _ => {}
             };
-            (ipc::Type::Int, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Int,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Int8 | Int16 | Int32 | Int64 => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::IntBuilder::new(&mut fbb);
             builder.add_is_signed(true);
             match data_type {
@@ -273,9 +327,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
                 Int64 => builder.add_bitWidth(64),
                 _ => {}
             };
-            (ipc::Type::Int, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Int,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Float16 | Float32 | Float64 => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::FloatingPointBuilder::new(&mut fbb);
             match data_type {
                 Float16 => builder.add_precision(ipc::Precision::HALF),
@@ -286,30 +345,57 @@ fn get_fb_field_type<'a: 'b, 'b>(
             (
                 ipc::Type::FloatingPoint,
                 builder.finish().as_union_value(),
-                None,
+                Some(children),
+            )
+        }
+        Binary => {
+            let children = fbb.create_vector(&empty_fields[..]);
+            (
+                ipc::Type::Binary,
+                ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(),
+                Some(children),
+            )
+        }
+        Utf8 => {
+            let children = fbb.create_vector(&empty_fields[..]);
+            (
+                ipc::Type::Utf8,
+                ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(),
+                Some(children),
+            )
+        }
+        FixedSizeBinary(len) => {
+            let children = fbb.create_vector(&empty_fields[..]);
+            let mut builder = ipc::FixedSizeBinaryBuilder::new(&mut fbb);
+            builder.add_byteWidth(*len as i32);
+            (
+                ipc::Type::FixedSizeBinary,
+                builder.finish().as_union_value(),
+                Some(children),
             )
         }
-        Binary => (
-            ipc::Type::Binary,
-            ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(),
-            None,
-        ),
-        Utf8 => (
-            ipc::Type::Utf8,
-            ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(),
-            None,
-        ),
         Date32(_) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::DateBuilder::new(&mut fbb);
             builder.add_unit(ipc::DateUnit::DAY);
-            (ipc::Type::Date, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Date,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Date64(_) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::DateBuilder::new(&mut fbb);
             builder.add_unit(ipc::DateUnit::MILLISECOND);
-            (ipc::Type::Date, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Date,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Time32(unit) | Time64(unit) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::TimeBuilder::new(&mut fbb);
             match unit {
                 TimeUnit::Second => {
@@ -329,9 +415,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
                     builder.add_unit(ipc::TimeUnit::NANOSECOND);
                 }
             }
-            (ipc::Type::Time, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Time,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Timestamp(unit, tz) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let tz = tz.clone().unwrap_or(Arc::new(String::new()));
             let tz_str = fbb.create_string(tz.as_str());
             let mut builder = ipc::TimestampBuilder::new(&mut fbb);
@@ -348,19 +439,25 @@ fn get_fb_field_type<'a: 'b, 'b>(
             (
                 ipc::Type::Timestamp,
                 builder.finish().as_union_value(),
-                None,
+                Some(children),
             )
         }
         Interval(unit) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::IntervalBuilder::new(&mut fbb);
             let interval_unit = match unit {
                 IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH,
                 IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME,
             };
             builder.add_unit(interval_unit);
-            (ipc::Type::Interval, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Interval,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         Duration(unit) => {
+            let children = fbb.create_vector(&empty_fields[..]);
             let mut builder = ipc::DurationBuilder::new(&mut fbb);
             let time_unit = match unit {
                 TimeUnit::Second => ipc::TimeUnit::SECOND,
@@ -369,7 +466,11 @@ fn get_fb_field_type<'a: 'b, 'b>(
                 TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
             };
             builder.add_unit(time_unit);
-            (ipc::Type::Duration, builder.finish().as_union_value(), None)
+            (
+                ipc::Type::Duration,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
         }
         List(ref list_type) => {
             let inner_types = get_fb_field_type(list_type, &mut fbb);
@@ -392,6 +493,29 @@ fn get_fb_field_type<'a: 'b, 'b>(
                 Some(children),
             )
         }
+        FixedSizeList((ref list_type, len)) => {
+            let inner_types = get_fb_field_type(list_type, &mut fbb);
+            let child = ipc::Field::create(
+                &mut fbb,
+                &ipc::FieldArgs {
+                    name: None,
+                    nullable: false,
+                    type_type: inner_types.0,
+                    type_: Some(inner_types.1),
+                    dictionary: None,
+                    children: inner_types.2,
+                    custom_metadata: None,
+                },
+            );
+            let children = fbb.create_vector(&[child]);
+            let mut builder = ipc::FixedSizeListBuilder::new(&mut fbb);
+            builder.add_listSize(*len as i32);
+            (
+                ipc::Type::FixedSizeList,
+                builder.finish().as_union_value(),
+                Some(children),
+            )
+        }
         Struct(fields) => {
             // struct's fields are children
             let mut children = vec![];
@@ -418,7 +542,6 @@ fn get_fb_field_type<'a: 'b, 'b>(
                 Some(children),
             )
         }
-        t @ _ => panic!("Unsupported Arrow Data Type {:?}", t),
     }
 }
 
@@ -533,4 +656,39 @@ mod tests {
         let schema2 = fb_to_schema(ipc);
         assert_eq!(schema, schema2);
     }
+
+    #[test]
+    fn schema_from_bytes() {
+        // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`.
+        // the schema is: Field("field1", DataType::UInt32, false)
+        let bytes: Vec<u8> = vec![
+            16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
+            12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20,
+            0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0,
+            0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0,
+            4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
+            0, 0, 0, 0,
+        ];
+        let ipc = ipc::get_root_as_message(&bytes[..]);
+        let schema = ipc.header_as_schema().unwrap();
+
+        // a message generated from Rust, same as the Python one
+        let bytes: Vec<u8> = vec![
+            16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0,
+            0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0,
+            8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0,
+            11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0,
+            8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
+            0, 0,
+        ];
+        let ipc2 = ipc::get_root_as_message(&bytes[..]);
+        let schema2 = ipc.header_as_schema().unwrap();
+
+        assert_eq!(schema, schema2);
+        assert_eq!(ipc.version(), ipc2.version());
+        assert_eq!(ipc.header_type(), ipc2.header_type());
+        assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
+        assert!(ipc.custom_metadata().is_none());
+        assert!(ipc2.custom_metadata().is_none());
+    }
 }
diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs
index 8c3bc08..1ef653f 100644
--- a/rust/arrow/src/ipc/mod.rs
+++ b/rust/arrow/src/ipc/mod.rs
@@ -17,6 +17,7 @@
 
 pub mod convert;
 pub mod reader;
+pub mod writer;
 
 pub mod gen;
 
@@ -25,3 +26,5 @@ pub use self::gen::Message::*;
 pub use self::gen::Schema::*;
 pub use self::gen::SparseTensor::*;
 pub use self::gen::Tensor::*;
+
+static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs
index bad6319..ebfac96 100644
--- a/rust/arrow/src/ipc/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -32,8 +32,6 @@ use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchReader};
 use DataType::*;
 
-static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
-
 /// Read a buffer based on offset and length
 fn read_buffer(buf: &ipc::Buffer, a_data: &Vec<u8>) -> Buffer {
     let start_offset = buf.offset() as usize;
@@ -404,14 +402,14 @@ impl<R: Read + Seek> FileReader<R> {
         // check if header and footer contain correct magic bytes
         let mut magic_buffer: [u8; 6] = [0; 6];
         reader.read_exact(&mut magic_buffer)?;
-        if magic_buffer != ARROW_MAGIC {
+        if magic_buffer != super::ARROW_MAGIC {
             return Err(ArrowError::IoError(
                 "Arrow file does not contain correct header".to_string(),
             ));
         }
         reader.seek(SeekFrom::End(-6))?;
         reader.read_exact(&mut magic_buffer)?;
-        if magic_buffer != ARROW_MAGIC {
+        if magic_buffer != super::ARROW_MAGIC {
             return Err(ArrowError::IoError(
                 "Arrow file does not contain correct footer".to_string(),
             ));
@@ -499,8 +497,9 @@ impl<R: Read + Seek> FileReader<R> {
 
                     read_record_batch(&buf, batch, self.schema())
                 }
-                _ => unimplemented!(
-                    "reading types other than record batches not yet supported"
+                t @ _ => unimplemented!(
+                    "reading type: {:?}, other than record batches not yet supported",
+                    t
                 ),
             }
         } else {
diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs
new file mode 100644
index 0000000..0800bd6
--- /dev/null
+++ b/rust/arrow/src/ipc/writer.rs
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Arrow IPC File and Stream Writers
+//!
+//! The `FileWriter` and `StreamWriter` have similar interfaces,
+//! however the `FileWriter` expects a reader that supports `Seek`ing
+
+use std::io::{BufWriter, Write};
+
+use flatbuffers::FlatBufferBuilder;
+
+use crate::array::ArrayDataRef;
+use crate::buffer::{Buffer, MutableBuffer};
+use crate::datatypes::*;
+use crate::error::{ArrowError, Result};
+use crate::ipc;
+use crate::record_batch::RecordBatch;
+
+pub struct FileWriter<W: Write> {
+    /// The object to write to
+    writer: BufWriter<W>,
+    /// A reference to the schema, used in validating record batches
+    schema: Schema,
+    /// The number of bytes written for the header (up to schema)
+    header_bytes: usize,
+    /// The number of bytes between each block of bytes, as an offset for random access
+    block_offsets: usize,
+    /// Dictionary blocks that will be written as part of the IPC footer
+    dictionary_blocks: Vec<ipc::Block>,
+    /// Record blocks that will be written as part of the IPC footer
+    record_blocks: Vec<ipc::Block>,
+    /// Whether the writer footer has been written, and the writer is finished
+    finished: bool,
+}
+
+impl<W: Write> FileWriter<W> {
+    /// Try create a new writer, with the schema written as part of the header
+    pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+        let mut writer = BufWriter::new(writer);
+        // write magic to header
+        writer.write(&super::ARROW_MAGIC[..])?;
+        // create an 8-byte boudnary after the header
+        writer.write(&[0, 0])?;
+        // write the schema, set the written bytes to the schema + header
+        let written = write_schema(&mut writer, schema)? + 8;
+        Ok(Self {
+            writer,
+            schema: schema.clone(),
+            header_bytes: written,
+            block_offsets: written,
+            dictionary_blocks: vec![],
+            record_blocks: vec![],
+            finished: false,
+        })
+    }
+
+    /// Write a record batch to the file
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        if self.finished {
+            return Err(ArrowError::IoError(
+                "Cannot write record batch to file writer as it is closed".to_string(),
+            ));
+        }
+        let (meta, data) = write_record_batch(&mut self.writer, batch)?;
+        // add a record block for the footer
+        self.record_blocks.push(ipc::Block::new(
+            self.block_offsets as i64,
+            (meta as i32) + 4,
+            data as i64,
+        ));
+        self.block_offsets += meta + data;
+        Ok(())
+    }
+
+    /// write footer and closing tag, then mark the writer as done
+    pub fn finish(&mut self) -> Result<()> {
+        let mut fbb = FlatBufferBuilder::new();
+        let dictionaries = fbb.create_vector(&self.dictionary_blocks);
+        let record_batches = fbb.create_vector(&self.record_blocks);
+        // TODO: this is duplicated as we otherwise mutably borrow twice
+        let schema = {
+            let mut fields = vec![];
+            for field in self.schema.fields() {
+                let fb_field_name = fbb.create_string(field.name().as_str());
+                let (ipc_type_type, ipc_type, ipc_children) =
+                    ipc::convert::get_fb_field_type(field.data_type(), &mut fbb);
+                let mut field_builder = ipc::FieldBuilder::new(&mut fbb);
+                field_builder.add_name(fb_field_name);
+                field_builder.add_type_type(ipc_type_type);
+                field_builder.add_nullable(field.is_nullable());
+                match ipc_children {
+                    None => {}
+                    Some(children) => field_builder.add_children(children),
+                };
+                field_builder.add_type_(ipc_type);
+                fields.push(field_builder.finish());
+            }
+
+            let mut custom_metadata = vec![];
+            for (k, v) in self.schema.metadata() {
+                let fb_key_name = fbb.create_string(k.as_str());
+                let fb_val_name = fbb.create_string(v.as_str());
+
+                let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb);
+                kv_builder.add_key(fb_key_name);
+                kv_builder.add_value(fb_val_name);
+                custom_metadata.push(kv_builder.finish());
+            }
+
+            let fb_field_list = fbb.create_vector(&fields);
+            let fb_metadata_list = fbb.create_vector(&custom_metadata);
+
+            let root = {
+                let mut builder = ipc::SchemaBuilder::new(&mut fbb);
+                builder.add_fields(fb_field_list);
+                builder.add_custom_metadata(fb_metadata_list);
+                builder.finish()
+            };
+            root
+        };
+        let root = {
+            let mut footer_builder = ipc::FooterBuilder::new(&mut fbb);
+            footer_builder.add_version(ipc::MetadataVersion::V4);
+            footer_builder.add_schema(schema);
+            footer_builder.add_dictionaries(dictionaries);
+            footer_builder.add_recordBatches(record_batches);
+            footer_builder.finish()
+        };
+        fbb.finish(root, None);
+        write_padded_data(&mut self.writer, fbb.finished_data(), WriteDataType::Footer)?;
+        self.writer.write(&super::ARROW_MAGIC)?;
+        self.writer.flush()?;
+        self.finished = true;
+
+        Ok(())
+    }
+}
+
+/// Finish the file if it is not 'finished' when it goes out of scope
+impl<W: Write> Drop for FileWriter<W> {
+    fn drop(&mut self) {
+        if !self.finished {
+            self.finish().unwrap();
+        }
+    }
+}
+
+/// Convert the schema to its IPC representation, and write it to the `writer`
+fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
+    let mut fbb = FlatBufferBuilder::new();
+    let schema = {
+        let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
+        fb.as_union_value()
+    };
+
+    let mut message = ipc::MessageBuilder::new(&mut fbb);
+    message.add_version(ipc::MetadataVersion::V4);
+    message.add_header_type(ipc::MessageHeader::Schema);
+    message.add_bodyLength(0);
+    message.add_header(schema);
+    // TODO: custom metadata
+    let data = message.finish();
+    fbb.finish(data, None);
+
+    let data = fbb.finished_data();
+    let written = write_padded_data(writer, data, WriteDataType::Header);
+
+    written
+}
+
+/// The message type being written. This determines whether to write the data length or not.
+/// Data length is written before the header, after the footer, and never for the body.
+#[derive(PartialEq)]
+enum WriteDataType {
+    Header,
+    Body,
+    Footer,
+}
+
+/// Write a slice of data to the writer, ensuring that it is padded to 8 bytes
+fn write_padded_data<R: Write>(
+    writer: &mut BufWriter<R>,
+    data: &[u8],
+    data_type: WriteDataType,
+) -> Result<usize> {
+    let len = data.len() as u32;
+    let pad_len = pad_to_8(len) as u32;
+    let total_len = len + pad_len;
+    // write data length
+    if data_type == WriteDataType::Header {
+        writer.write(&total_len.to_le_bytes()[..])?;
+    }
+    // write flatbuffer data
+    writer.write(data)?;
+    if pad_len > 0 {
+        writer.write(&vec![0u8; pad_len as usize][..])?;
+    }
+    if data_type == WriteDataType::Footer {
+        writer.write(&total_len.to_le_bytes()[..])?;
+    }
+    writer.flush()?;
+    Ok(total_len as usize)
+}
+
+/// Write a record batch to the writer
+fn write_record_batch<R: Write>(
+    writer: &mut BufWriter<R>,
+    batch: &RecordBatch,
+) -> Result<(usize, usize)> {
+    let mut fbb = FlatBufferBuilder::new();
+
+    let mut nodes: Vec<ipc::FieldNode> = vec![];
+    let mut buffers: Vec<ipc::Buffer> = vec![];
+    let mut arrow_data: Vec<u8> = vec![];
+    let mut offset = 0;
+    for array in batch.columns() {
+        let array_data = array.data();
+        offset = write_array_data(
+            &array_data,
+            &mut buffers,
+            &mut arrow_data,
+            &mut nodes,
+            offset,
+            array.len(),
+            array.null_count(),
+        );
+    }
+
+    // write data
+    let buffers = fbb.create_vector(&buffers);
+    let nodes = fbb.create_vector(&nodes);
+
+    let root = {
+        let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+        batch_builder.add_length(batch.num_rows() as i64);
+        batch_builder.add_nodes(nodes);
+        batch_builder.add_buffers(buffers);
+        let b = batch_builder.finish();
+        b.as_union_value()
+    };
+    // create an ipc::Message
+    let mut message = ipc::MessageBuilder::new(&mut fbb);
+    message.add_version(ipc::MetadataVersion::V4);
+    message.add_header_type(ipc::MessageHeader::RecordBatch);
+    message.add_bodyLength(arrow_data.len() as i64);
+    message.add_header(root);
+    let root = message.finish();
+    fbb.finish(root, None);
+    let meta_written =
+        write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?;
+    let arrow_data_written =
+        write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
+    Ok((meta_written, arrow_data_written))
+}
+
+/// Write array data to a vector of bytes
+fn write_array_data(
+    array_data: &ArrayDataRef,
+    mut buffers: &mut Vec<ipc::Buffer>,
+    mut arrow_data: &mut Vec<u8>,
+    mut nodes: &mut Vec<ipc::FieldNode>,
+    offset: i64,
+    num_rows: usize,
+    null_count: usize,
+) -> i64 {
+    let mut offset = offset;
+    nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
+    // write null buffer if exists
+    let null_buffer = match array_data.null_buffer() {
+        None => {
+            // create a buffer and fill it with valid bits
+            let buffer = MutableBuffer::new(num_rows);
+            let buffer = buffer.with_bitset(num_rows, true);
+            let buffer = buffer.freeze();
+            buffer
+        }
+        Some(buffer) => buffer.clone(),
+    };
+    offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset);
+
+    array_data.buffers().iter().for_each(|buffer| {
+        offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset);
+    });
+
+    // recursively write out nested structures
+    array_data.child_data().iter().for_each(|data_ref| {
+        // write the nested data (e.g list data)
+        offset = write_array_data(
+            data_ref,
+            &mut buffers,
+            &mut arrow_data,
+            &mut nodes,
+            offset,
+            data_ref.len(),
+            data_ref.null_count(),
+        );
+    });
+    offset
+}
+
+/// Write a buffer to a vector of bytes, and add its ipc Buffer to a vector
+fn write_buffer(
+    buffer: &Buffer,
+    buffers: &mut Vec<ipc::Buffer>,
+    arrow_data: &mut Vec<u8>,
+    offset: i64,
+) -> i64 {
+    let len = buffer.len();
+    let pad_len = pad_to_8(len as u32);
+    let total_len: i64 = (len + pad_len) as i64;
+    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
+    buffers.push(ipc::Buffer::new(offset, total_len));
+    arrow_data.extend_from_slice(buffer.data());
+    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
+    offset + total_len
+}
+
+/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
+fn pad_to_8<'a>(len: u32) -> usize {
+    match len % 8 {
+        0 => 0 as usize,
+        v => 8 - v as usize,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use flate2::read::GzDecoder;
+
+    use crate::array::*;
+    use crate::datatypes::Field;
+    use crate::ipc::reader::*;
+    use crate::util::integration_util::*;
+    use std::env;
+    use std::fs::File;
+    use std::io::Read;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_write_file() {
+        let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
+        let values: Vec<Option<u32>> = vec![
+            Some(999),
+            None,
+            Some(235),
+            Some(123),
+            None,
+            None,
+            None,
+            None,
+            None,
+        ];
+        let array1 = UInt32Array::from(values);
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(array1) as ArrayRef],
+        )
+        .unwrap();
+        {
+            let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap();
+            let mut writer = FileWriter::try_new(file, &schema).unwrap();
+
+            writer.write(&batch).unwrap();
+            // this is inside a block to test the implicit finishing of the file on `Drop`
+        }
+
+        {
+            let file =
+                File::open(format!("target/debug/testdata/{}.arrow_file", "arrow"))
+                    .unwrap();
+            let mut reader = FileReader::try_new(file).unwrap();
+            while let Ok(Some(read_batch)) = reader.next() {
+                read_batch
+                    .columns()
+                    .iter()
+                    .zip(batch.columns())
+                    .for_each(|(a, b)| {
+                        assert_eq!(a.data_type(), b.data_type());
+                        assert_eq!(a.len(), b.len());
+                        assert_eq!(a.null_count(), b.null_count());
+                    });
+            }
+        }
+        // panic!("intentional failure");
+    }
+
+    #[test]
+    fn read_and_rewrite_generated_files() {
+        let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec![
+            "generated_interval",
+            "generated_datetime",
+            "generated_nested",
+            "generated_primitive_no_batches",
+            "generated_primitive_zerolength",
+            "generated_primitive",
+        ];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc/integration/0.14.1/{}.arrow_file",
+                testdata, path
+            ))
+            .unwrap();
+
+            let mut reader = FileReader::try_new(file).unwrap();
+
+            // read and rewrite the file to a temp location
+            {
+                let file =
+                    File::create(format!("target/debug/testdata/{}.arrow_file", path))
+                        .unwrap();
+                let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap();
+                while let Ok(Some(batch)) = reader.next() {
+                    writer.write(&batch).unwrap();
+                }
+                writer.finish().unwrap();
+            }
+
+            let file =
+                File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap();
+            let mut reader = FileReader::try_new(file).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(path);
+            assert!(arrow_json.equals_reader(&mut reader));
+        });
+    }
+    /// Read gzipped JSON file
+    fn read_gzip_json(path: &str) -> ArrowJson {
+        let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+        let file = File::open(format!(
+            "{}/arrow-ipc/integration/0.14.1/{}.json.gz",
+            testdata, path
+        ))
+        .unwrap();
+        let mut gz = GzDecoder::new(&file);
+        let mut s = String::new();
+        gz.read_to_string(&mut s).unwrap();
+        // convert to Arrow JSON
+        let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
+        arrow_json
+    }
+}