You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2020/09/16 00:00:49 UTC

[arrow] branch master updated: ARROW-9848: [Rust] Implement 0.15 IPC alignment

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 75f804e  ARROW-9848: [Rust] Implement 0.15 IPC alignment
75f804e is described below

commit 75f804efbfe367175fef5a2238d9cd2d30ed3afe
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Sep 15 18:00:13 2020 -0600

    ARROW-9848: [Rust] Implement 0.15 IPC alignment
    
    Changes introduced in 0.15.0 changed the buffer alignment by adding a continuation marker to messages.
    The previous behaviour was then marked as legacy, while both worked under V4 of the IPC metadata version.
    
    This change catches the Rust implementation up to other languages, and has the consequence that more integration tests now pass.
    
    The change is applied on top of clippy changes, and can be reviewed/merged after the relevant PRs.
    
    Closes #8174 from nevi-me/ARROW-9848-on-clippy
    
    Authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Andy Grove <an...@nvidia.com>
---
 dev/archery/archery/integration/datagen.py |  14 +-
 rust/arrow-flight/src/utils.rs             |  15 +-
 rust/arrow/src/ipc/convert.rs              |   2 +-
 rust/arrow/src/ipc/mod.rs                  |   1 +
 rust/arrow/src/ipc/reader.rs               |  77 +++++---
 rust/arrow/src/ipc/writer.rs               | 304 +++++++++++++++++++++++------
 6 files changed, 306 insertions(+), 107 deletions(-)

diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py
index 69f463b..32ecbb6 100644
--- a/dev/archery/archery/integration/datagen.py
+++ b/dev/archery/archery/integration/datagen.py
@@ -1493,23 +1493,18 @@ def get_generated_json_files(tempdir=None):
 
     file_objs = [
         generate_primitive_case([], name='primitive_no_batches'),
-        generate_primitive_case([17, 20], name='primitive')
-        .skip_category('Rust'),
-        generate_primitive_case([0, 0, 0], name='primitive_zerolength')
-        .skip_category('Rust'),
+        generate_primitive_case([17, 20], name='primitive'),
+        generate_primitive_case([0, 0, 0], name='primitive_zerolength'),
 
         generate_primitive_large_offsets_case([17, 20])
         .skip_category('Go')
-        .skip_category('JS')
-        .skip_category('Rust'),
+        .skip_category('JS'),
 
         generate_null_case([10, 0])
-        .skip_category('Rust')
         .skip_category('JS')   # TODO(ARROW-7900)
         .skip_category('Go'),  # TODO(ARROW-7901)
 
         generate_null_trivial_case([0, 0])
-        .skip_category('Rust')
         .skip_category('JS')   # TODO(ARROW-7900)
         .skip_category('Go'),  # TODO(ARROW-7901)
 
@@ -1517,8 +1512,7 @@ def get_generated_json_files(tempdir=None):
         .skip_category('Go')  # TODO(ARROW-7948): Decimal + Go
         .skip_category('Rust'),
 
-        generate_datetime_case()
-        .skip_category('Rust'),
+        generate_datetime_case(),
 
         generate_interval_case()
         .skip_category('JS')  # TODO(ARROW-5239): Intervals + JS
diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs
index aa93cbf..c28e39b 100644
--- a/rust/arrow-flight/src/utils.rs
+++ b/rust/arrow-flight/src/utils.rs
@@ -29,12 +29,13 @@ use arrow::record_batch::RecordBatch;
 /// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
 impl From<&RecordBatch> for FlightData {
     fn from(batch: &RecordBatch) -> Self {
-        let (header, body) = writer::record_batch_to_bytes(batch);
+        let options = writer::IpcWriteOptions::default();
+        let data = writer::record_batch_to_bytes(batch, &options);
         Self {
             flight_descriptor: None,
             app_metadata: vec![],
-            data_header: header,
-            data_body: body,
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
         }
     }
 }
@@ -42,8 +43,9 @@ impl From<&RecordBatch> for FlightData {
 /// Convert a `Schema` to `SchemaResult` by converting to an IPC message
 impl From<&Schema> for SchemaResult {
     fn from(schema: &Schema) -> Self {
+        let options = writer::IpcWriteOptions::default();
         Self {
-            schema: writer::schema_to_bytes(schema),
+            schema: writer::schema_to_bytes(schema, &options).ipc_message,
         }
     }
 }
@@ -51,11 +53,12 @@ impl From<&Schema> for SchemaResult {
 /// Convert a `Schema` to `FlightData` by converting to an IPC message
 impl From<&Schema> for FlightData {
     fn from(schema: &Schema) -> Self {
-        let schema = writer::schema_to_bytes(schema);
+        let options = writer::IpcWriteOptions::default();
+        let schema = writer::schema_to_bytes(schema, &options);
         Self {
             flight_descriptor: None,
             app_metadata: vec![],
-            data_header: schema,
+            data_header: schema.ipc_message,
             data_body: vec![],
         }
     }
diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs
index 1788a24..7a5795d 100644
--- a/rust/arrow/src/ipc/convert.rs
+++ b/rust/arrow/src/ipc/convert.rs
@@ -345,7 +345,7 @@ pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
         Null => FBFieldType {
             type_type: ipc::Type::Null,
             type_: ipc::NullBuilder::new(fbb).finish().as_union_value(),
-            children: None,
+            children: Some(fbb.create_vector(&empty_fields[..])),
         },
         Boolean => FBFieldType {
             type_type: ipc::Type::Bool,
diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs
index 5f7442d..cba8fb2 100644
--- a/rust/arrow/src/ipc/mod.rs
+++ b/rust/arrow/src/ipc/mod.rs
@@ -36,3 +36,4 @@ pub use self::gen::SparseTensor::*;
 pub use self::gen::Tensor::*;
 
 static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
+static CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs
index c39c737..af0b4e6 100644
--- a/rust/arrow/src/ipc/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -31,9 +31,9 @@ use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
 use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchReader};
-use DataType::*;
 
-const CONTINUATION_MARKER: u32 = 0xffff_ffff;
+use ipc::CONTINUATION_MARKER;
+use DataType::*;
 
 /// Read a buffer based on offset and length
 fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
@@ -482,6 +482,9 @@ pub struct FileReader<R: Read + Seek> {
     ///
     /// Dictionaries may be appended to in the streaming format.
     dictionaries_by_field: Vec<Option<ArrayRef>>,
+
+    /// Metadata version
+    metadata_version: ipc::MetadataVersion,
 }
 
 impl<R: Read + Seek> FileReader<R> {
@@ -506,12 +509,11 @@ impl<R: Read + Seek> FileReader<R> {
                 "Arrow file does not contain correct footer".to_string(),
             ));
         }
-
-        // what does the footer contain?
+        // read footer length
         let mut footer_size: [u8; 4] = [0; 4];
         reader.seek(SeekFrom::End(-10))?;
         reader.read_exact(&mut footer_size)?;
-        let footer_len = u32::from_le_bytes(footer_size);
+        let footer_len = i32::from_le_bytes(footer_size);
 
         // read footer
         let mut footer_data = vec![0; footer_len as usize];
@@ -534,6 +536,7 @@ impl<R: Read + Seek> FileReader<R> {
         let mut dictionaries_by_field = vec![None; schema.fields().len()];
         for block in footer.dictionaries().unwrap() {
             // read length from end of offset
+            // TODO: ARROW-9848: dictionary metadata has not been tested
             let meta_len = block.metaDataLength() - 4;
 
             let mut block_data = vec![0; meta_len as usize];
@@ -554,15 +557,21 @@ impl<R: Read + Seek> FileReader<R> {
                     reader.read_exact(&mut buf)?;
 
                     if batch.isDelta() {
-                        panic!("delta dictionary batches not supported");
+                        return Err(ArrowError::IoError(
+                            "delta dictionary batches not supported".to_string(),
+                        ));
                     }
 
                     let id = batch.id();
 
                     // As the dictionary batch does not contain the type of the
                     // values array, we need to retieve this from the schema.
-                    let first_field = find_dictionary_field(&ipc_schema, id)
-                        .expect("dictionary id not found in shchema");
+                    let first_field =
+                        find_dictionary_field(&ipc_schema, id).ok_or_else(|| {
+                            ArrowError::InvalidArgumentError(
+                                "dictionary id not found in schema".to_string(),
+                            )
+                        })?;
 
                     // Get an array representing this dictionary's values.
                     let dictionary_values: ArrayRef =
@@ -589,7 +598,11 @@ impl<R: Read + Seek> FileReader<R> {
                             }
                             _ => None,
                         }
-                        .expect("dictionary id not found in schema");
+                        .ok_or_else(|| {
+                            ArrowError::InvalidArgumentError(
+                                "dictionary id not found in schema".to_string(),
+                            )
+                        })?;
 
                     // for all fields with this dictionary id, update the dictionaries vector
                     // in the reader. Note that a dictionary batch may be shared between many fields.
@@ -606,7 +619,11 @@ impl<R: Read + Seek> FileReader<R> {
                         }
                     }
                 }
-                _ => panic!("Expecting DictionaryBatch in dictionary blocks."),
+                _ => {
+                    return Err(ArrowError::IoError(
+                        "Expecting DictionaryBatch in dictionary blocks.".to_string(),
+                    ))
+                }
             };
         }
 
@@ -617,6 +634,7 @@ impl<R: Read + Seek> FileReader<R> {
             current_block: 0,
             total_blocks,
             dictionaries_by_field,
+            metadata_version: footer.version(),
         })
     }
 
@@ -657,16 +675,31 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
             let block = self.blocks[self.current_block];
             self.current_block += 1;
 
-            // read length from end of offset
-            let meta_len = block.metaDataLength() - 4;
+            // read length
+            self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
+            let mut meta_buf = [0; 4];
+            self.reader.read_exact(&mut meta_buf)?;
+            if meta_buf == CONTINUATION_MARKER {
+                // continuation marker encountered, read message next
+                self.reader.read_exact(&mut meta_buf)?;
+            }
+            let meta_len = i32::from_le_bytes(meta_buf);
 
             let mut block_data = vec![0; meta_len as usize];
-            self.reader
-                .seek(SeekFrom::Start(block.offset() as u64 + 4))?;
             self.reader.read_exact(&mut block_data)?;
 
             let message = ipc::get_root_as_message(&block_data[..]);
 
+            // some old test data's footer metadata is not set, so we account for that
+            if self.metadata_version != ipc::MetadataVersion::V1
+                && message.version() != self.metadata_version
+            {
+                return Err(ArrowError::IoError(
+                    "Could not read IPC message as metadata versions mismatch"
+                        .to_string(),
+                ));
+            }
+
             match message.header_type() {
                 ipc::MessageHeader::Schema => Err(ArrowError::IoError(
                     "Not expecting a schema when messages are read".to_string(),
@@ -733,16 +766,12 @@ impl<R: Read> StreamReader<R> {
         let mut meta_size: [u8; 4] = [0; 4];
         reader.read_exact(&mut meta_size)?;
         let meta_len = {
-            let meta_len = u32::from_le_bytes(meta_size);
-
             // If a continuation marker is encountered, skip over it and read
             // the size from the next four bytes.
-            if meta_len == CONTINUATION_MARKER {
+            if meta_size == CONTINUATION_MARKER {
                 reader.read_exact(&mut meta_size)?;
-                u32::from_le_bytes(meta_size)
-            } else {
-                meta_len
             }
+            i32::from_le_bytes(meta_size)
         };
 
         let mut meta_buffer = vec![0; meta_len as usize];
@@ -806,16 +835,12 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
         }
 
         let meta_len = {
-            let meta_len = u32::from_le_bytes(meta_size);
-
             // If a continuation marker is encountered, skip over it and read
             // the size from the next four bytes.
-            if meta_len == CONTINUATION_MARKER {
+            if meta_size == CONTINUATION_MARKER {
                 self.reader.read_exact(&mut meta_size)?;
-                u32::from_le_bytes(meta_size)
-            } else {
-                meta_len
             }
+            i32::from_le_bytes(meta_size)
         };
 
         if meta_len == 0 {
diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs
index 2212143..effbc71 100644
--- a/rust/arrow/src/ipc/writer.rs
+++ b/rust/arrow/src/ipc/writer.rs
@@ -32,9 +32,76 @@ use crate::ipc;
 use crate::record_batch::RecordBatch;
 use crate::util::bit_util;
 
+use ipc::CONTINUATION_MARKER;
+
+/// IPC write options used to control the behaviour of the writer
+#[derive(Debug)]
+pub struct IpcWriteOptions {
+    /// Write padding after memory buffers to this multiple of bytes.
+    /// Generally 8 or 64, defaults to 8
+    alignment: usize,
+    /// The legacy format is for releases before 0.15.0, and uses metadata V4
+    write_legacy_ipc_format: bool,
+    /// The metadata version to write. The Rust IPC writer supports V4+
+    metadata_version: ipc::MetadataVersion,
+}
+
+impl IpcWriteOptions {
+    /// Try create IpcWriteOptions, checking for incompatible settings
+    pub fn try_new(
+        alignment: usize,
+        write_legacy_ipc_format: bool,
+        metadata_version: ipc::MetadataVersion,
+    ) -> Result<Self> {
+        if alignment == 0 || alignment % 8 != 0 {
+            return Err(ArrowError::InvalidArgumentError(
+                "Alignment should be greater than 0 and be a multiple of 8".to_string(),
+            ));
+        }
+        match metadata_version {
+            ipc::MetadataVersion::V1
+            | ipc::MetadataVersion::V2
+            | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
+                "Writing IPC metadata version 3 and lower not supported".to_string(),
+            )),
+            ipc::MetadataVersion::V4 => Ok(Self {
+                alignment,
+                write_legacy_ipc_format,
+                metadata_version,
+            }),
+            ipc::MetadataVersion::V5 => {
+                if write_legacy_ipc_format {
+                    Err(ArrowError::InvalidArgumentError(
+                        "Legacy IPC format only supported on metadata version 4"
+                            .to_string(),
+                    ))
+                } else {
+                    Ok(Self {
+                        alignment,
+                        write_legacy_ipc_format,
+                        metadata_version,
+                    })
+                }
+            }
+        }
+    }
+}
+
+impl Default for IpcWriteOptions {
+    fn default() -> Self {
+        Self {
+            alignment: 8,
+            write_legacy_ipc_format: true,
+            metadata_version: ipc::MetadataVersion::V4,
+        }
+    }
+}
+
 pub struct FileWriter<W: Write> {
     /// The object to write to
     writer: BufWriter<W>,
+    /// IPC write options
+    write_options: IpcWriteOptions,
     /// A reference to the schema, used in validating record batches
     schema: Schema,
     /// The number of bytes between each block of bytes, as an offset for random access
@@ -50,17 +117,29 @@ pub struct FileWriter<W: Write> {
 impl<W: Write> FileWriter<W> {
     /// Try create a new writer, with the schema written as part of the header
     pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+        let write_options = IpcWriteOptions::default();
+        Self::try_new_with_options(writer, schema, write_options)
+    }
+
+    /// Try create a new writer with IpcWriteOptions
+    pub fn try_new_with_options(
+        writer: W,
+        schema: &Schema,
+        write_options: IpcWriteOptions,
+    ) -> Result<Self> {
         let mut writer = BufWriter::new(writer);
         // write magic to header
         writer.write_all(&super::ARROW_MAGIC[..])?;
         // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
         // write the schema, set the written bytes to the schema + header
-        let written = write_schema(&mut writer, schema)? + 8;
+        let message = Message::Schema(schema, &write_options);
+        let (meta, data) = write_message(&mut writer, &message, &write_options)?;
         Ok(Self {
             writer,
+            write_options,
             schema: schema.clone(),
-            block_offsets: written,
+            block_offsets: meta + data + 8,
             dictionary_blocks: vec![],
             record_blocks: vec![],
             finished: false,
@@ -74,19 +153,25 @@ impl<W: Write> FileWriter<W> {
                 "Cannot write record batch to file writer as it is closed".to_string(),
             ));
         }
-        let (meta, data) = write_record_batch(&mut self.writer, batch, false)?;
+        let message = Message::RecordBatch(batch, &self.write_options);
+        let (meta, data) =
+            write_message(&mut self.writer, &message, &self.write_options)?;
         // add a record block for the footer
-        self.record_blocks.push(ipc::Block::new(
+        let block = ipc::Block::new(
             self.block_offsets as i64,
-            (meta as i32) + 4,
+            meta as i32, // TODO: is this still applicable?
             data as i64,
-        ));
+        );
+        self.record_blocks.push(block);
         self.block_offsets += meta + data;
         Ok(())
     }
 
     /// Write footer and closing tag, then mark the writer as done
     pub fn finish(&mut self) -> Result<()> {
+        // write EOS
+        write_continuation(&mut self.writer, &self.write_options, 0)?;
+
         let mut fbb = FlatBufferBuilder::new();
         let dictionaries = fbb.create_vector(&self.dictionary_blocks);
         let record_batches = fbb.create_vector(&self.record_blocks);
@@ -130,14 +215,17 @@ impl<W: Write> FileWriter<W> {
         };
         let root = {
             let mut footer_builder = ipc::FooterBuilder::new(&mut fbb);
-            footer_builder.add_version(ipc::MetadataVersion::V4);
+            footer_builder.add_version(self.write_options.metadata_version);
             footer_builder.add_schema(schema);
             footer_builder.add_dictionaries(dictionaries);
             footer_builder.add_recordBatches(record_batches);
             footer_builder.finish()
         };
         fbb.finish(root, None);
-        write_padded_data(&mut self.writer, fbb.finished_data(), WriteDataType::Footer)?;
+        let footer_data = fbb.finished_data();
+        self.writer.write_all(footer_data)?;
+        self.writer
+            .write_all(&(footer_data.len() as i32).to_le_bytes())?;
         self.writer.write_all(&super::ARROW_MAGIC)?;
         self.writer.flush()?;
         self.finished = true;
@@ -158,6 +246,8 @@ impl<W: Write> Drop for FileWriter<W> {
 pub struct StreamWriter<W: Write> {
     /// The object to write to
     writer: BufWriter<W>,
+    /// IPC write options
+    write_options: IpcWriteOptions,
     /// A reference to the schema, used in validating record batches
     schema: Schema,
     /// Whether the writer footer has been written, and the writer is finished
@@ -167,11 +257,22 @@ pub struct StreamWriter<W: Write> {
 impl<W: Write> StreamWriter<W> {
     /// Try create a new writer, with the schema written as part of the header
     pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+        let write_options = IpcWriteOptions::default();
+        Self::try_new_with_options(writer, schema, write_options)
+    }
+
+    pub fn try_new_with_options(
+        writer: W,
+        schema: &Schema,
+        write_options: IpcWriteOptions,
+    ) -> Result<Self> {
         let mut writer = BufWriter::new(writer);
         // write the schema, set the written bytes to the schema
-        write_schema(&mut writer, schema)?;
+        let message = Message::Schema(schema, &write_options);
+        write_message(&mut writer, &message, &write_options)?;
         Ok(Self {
             writer,
+            write_options,
             schema: schema.clone(),
             finished: false,
         })
@@ -184,15 +285,14 @@ impl<W: Write> StreamWriter<W> {
                 "Cannot write record batch to stream writer as it is closed".to_string(),
             ));
         }
-        write_record_batch(&mut self.writer, batch, true)?;
+        let message = Message::RecordBatch(batch, &self.write_options);
+        write_message(&mut self.writer, &message, &self.write_options)?;
         Ok(())
     }
 
     /// Write continuation bytes, and mark the stream as done
     pub fn finish(&mut self) -> Result<()> {
-        self.writer.write_all(&[255u8, 255, 255, 255])?;
-        self.writer.write_all(&[0u8, 0, 0, 0])?;
-        self.writer.flush()?;
+        write_continuation(&mut self.writer, &self.write_options, 0)?;
 
         self.finished = true;
 
@@ -209,7 +309,15 @@ impl<W: Write> Drop for StreamWriter<W> {
     }
 }
 
-pub fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
+/// Stores the encoded data, which is an ipc::Message, and optional Arrow data
+pub struct EncodedData {
+    /// An encoded ipc::Message
+    pub ipc_message: Vec<u8>,
+    /// Arrow buffers to be written, should be an empty vec for schema messages
+    pub arrow_data: Vec<u8>,
+}
+
+pub fn schema_to_bytes(schema: &Schema, write_options: &IpcWriteOptions) -> EncodedData {
     let mut fbb = FlatBufferBuilder::new();
     let schema = {
         let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
@@ -217,7 +325,7 @@ pub fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
     };
 
     let mut message = ipc::MessageBuilder::new(&mut fbb);
-    message.add_version(ipc::MetadataVersion::V4);
+    message.add_version(write_options.metadata_version);
     message.add_header_type(ipc::MessageHeader::Schema);
     message.add_bodyLength(0);
     message.add_header(schema);
@@ -226,51 +334,101 @@ pub fn schema_to_bytes(schema: &Schema) -> Vec<u8> {
     fbb.finish(data, None);
 
     let data = fbb.finished_data();
-    data.to_vec()
+    EncodedData {
+        ipc_message: data.to_vec(),
+        arrow_data: vec![],
+    }
 }
 
-/// Convert the schema to its IPC representation, and write it to the `writer`
-fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
-    let data = schema_to_bytes(schema);
-    write_padded_data(writer, &data[..], WriteDataType::Header)
+enum Message<'a> {
+    Schema(&'a Schema, &'a IpcWriteOptions),
+    RecordBatch(&'a RecordBatch, &'a IpcWriteOptions),
+    DictionaryBatch(&'a IpcWriteOptions),
 }
 
-/// The message type being written. This determines whether to write the data length or not.
-/// Data length is written before the header, after the footer, and never for the body.
-#[derive(PartialEq)]
-enum WriteDataType {
-    Header,
-    Body,
-    Footer,
+impl<'a> Message<'a> {
+    /// Encode message to a ipc::Message and return data as bytes
+    fn encode(&'a self) -> EncodedData {
+        match self {
+            Message::Schema(schema, options) => schema_to_bytes(*schema, *options),
+            Message::RecordBatch(batch, options) => {
+                record_batch_to_bytes(*batch, *options)
+            }
+            Message::DictionaryBatch(_) => {
+                unimplemented!("Writing dictionary batches not implemented")
+            }
+        }
+    }
 }
 
-/// Write a slice of data to the writer, ensuring that it is padded to 8 bytes
-fn write_padded_data<R: Write>(
-    writer: &mut BufWriter<R>,
-    data: &[u8],
-    data_type: WriteDataType,
-) -> Result<usize> {
+/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
+fn write_message<W: Write>(
+    mut writer: &mut BufWriter<W>,
+    message: &Message,
+    write_options: &IpcWriteOptions,
+) -> Result<(usize, usize)> {
+    let encoded = message.encode();
+    let arrow_data_len = encoded.arrow_data.len();
+    if arrow_data_len % 8 != 0 {
+        return Err(ArrowError::MemoryError(
+            "Arrow data not aligned".to_string(),
+        ));
+    }
+
+    let a = write_options.alignment - 1;
+    let buffer = encoded.ipc_message;
+    let flatbuf_size = buffer.len();
+    let prefix_size = if write_options.write_legacy_ipc_format {
+        4
+    } else {
+        8
+    };
+    let aligned_size = (flatbuf_size + prefix_size + a) & !a;
+    let padding_bytes = aligned_size - flatbuf_size - prefix_size;
+
+    write_continuation(
+        &mut writer,
+        &write_options,
+        (aligned_size - prefix_size) as i32,
+    )?;
+
+    // write the flatbuf
+    if flatbuf_size > 0 {
+        writer.write_all(&buffer)?;
+    }
+    // write padding
+    writer.write_all(&vec![0; padding_bytes])?;
+
+    // write arrow data
+    let body_len = if arrow_data_len > 0 {
+        write_body_buffers(&mut writer, &encoded.arrow_data)?
+    } else {
+        0
+    };
+
+    Ok((aligned_size, body_len))
+}
+
+fn write_body_buffers<W: Write>(writer: &mut BufWriter<W>, data: &[u8]) -> Result<usize> {
     let len = data.len() as u32;
     let pad_len = pad_to_8(len) as u32;
     let total_len = len + pad_len;
-    // write data length
-    if data_type == WriteDataType::Header {
-        writer.write_all(&total_len.to_le_bytes()[..])?;
-    }
-    // write flatbuffer data
+
+    // write body buffer
     writer.write_all(data)?;
     if pad_len > 0 {
         writer.write_all(&vec![0u8; pad_len as usize][..])?;
     }
-    if data_type == WriteDataType::Footer {
-        writer.write_all(&total_len.to_le_bytes()[..])?;
-    }
+
     writer.flush()?;
     Ok(total_len as usize)
 }
 
 /// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data
-pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
+pub fn record_batch_to_bytes(
+    batch: &RecordBatch,
+    write_options: &IpcWriteOptions,
+) -> EncodedData {
     let mut fbb = FlatBufferBuilder::new();
 
     let mut nodes: Vec<ipc::FieldNode> = vec![];
@@ -304,7 +462,7 @@ pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
     };
     // create an ipc::Message
     let mut message = ipc::MessageBuilder::new(&mut fbb);
-    message.add_version(ipc::MetadataVersion::V4);
+    message.add_version(write_options.metadata_version);
     message.add_header_type(ipc::MessageHeader::RecordBatch);
     message.add_bodyLength(arrow_data.len() as i64);
     message.add_header(root);
@@ -312,26 +470,46 @@ pub fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec<u8>, Vec<u8>) {
     fbb.finish(root, None);
     let finished_data = fbb.finished_data();
 
-    (finished_data.to_vec(), arrow_data)
+    EncodedData {
+        ipc_message: finished_data.to_vec(),
+        arrow_data,
+    }
 }
 
 /// Write a record batch to the writer, writing the message size before the message
 /// if the record batch is being written to a stream
-fn write_record_batch<R: Write>(
-    writer: &mut BufWriter<R>,
-    batch: &RecordBatch,
-    is_stream: bool,
-) -> Result<(usize, usize)> {
-    let (meta_data, arrow_data) = record_batch_to_bytes(batch);
-    // write the length of data if writing to stream
-    if is_stream {
-        let total_len: u32 = meta_data.len() as u32;
-        writer.write_all(&total_len.to_le_bytes()[..])?;
-    }
-    let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?;
-    let arrow_data_written =
-        write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
-    Ok((meta_written, arrow_data_written))
+fn write_continuation<W: Write>(
+    writer: &mut BufWriter<W>,
+    write_options: &IpcWriteOptions,
+    total_len: i32,
+) -> Result<usize> {
+    let mut written = 8;
+
+    // the version of the writer determines whether continuation markers should be added
+    match write_options.metadata_version {
+        ipc::MetadataVersion::V1
+        | ipc::MetadataVersion::V2
+        | ipc::MetadataVersion::V3 => {
+            unreachable!("Options with the metadata version cannot be created")
+        }
+        ipc::MetadataVersion::V4 => {
+            if !write_options.write_legacy_ipc_format {
+                // v0.15.0 format
+                writer.write_all(&CONTINUATION_MARKER)?;
+                written = 4;
+            }
+            writer.write_all(&total_len.to_le_bytes()[..])?;
+        }
+        ipc::MetadataVersion::V5 => {
+            // write continuation marker and message length
+            writer.write_all(&CONTINUATION_MARKER)?;
+            writer.write_all(&total_len.to_le_bytes()[..])?;
+        }
+    };
+
+    writer.flush()?;
+
+    Ok(written)
 }
 
 /// Write array data to a vector of bytes
@@ -383,7 +561,7 @@ fn write_array_data(
     offset
 }
 
-/// Write a buffer to a vector of bytes, and add its ipc Buffer to a vector
+/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
 fn write_buffer(
     buffer: &Buffer,
     buffers: &mut Vec<ipc::Buffer>,
@@ -401,11 +579,9 @@ fn write_buffer(
 }
 
 /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
+#[inline]
 fn pad_to_8(len: u32) -> usize {
-    match len % 8 {
-        0 => 0 as usize,
-        v => 8 - v as usize,
-    }
+    (((len + 7) & !7) - len) as usize
 }
 
 #[cfg(test)]