You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2019/12/29 21:16:35 UTC
[arrow] 03/03: ARROW-5182: [Rust] Arrow IPC file writer
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch ARROW-5182
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit b42fa717182616d346a90dd2cd8202e60415e782
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Sun Dec 29 23:14:03 2019 +0200
ARROW-5182: [Rust] Arrow IPC file writer
---
rust/arrow/src/ipc/convert.rs | 218 +++++++++++++++++---
rust/arrow/src/ipc/mod.rs | 3 +
rust/arrow/src/ipc/reader.rs | 11 +-
rust/arrow/src/ipc/writer.rs | 461 ++++++++++++++++++++++++++++++++++++++++++
4 files changed, 657 insertions(+), 36 deletions(-)
diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs
index e2416ce..e3d70d9 100644
--- a/rust/arrow/src/ipc/convert.rs
+++ b/rust/arrow/src/ipc/convert.rs
@@ -26,8 +26,10 @@ use flatbuffers::{
use std::collections::HashMap;
use std::sync::Arc;
+use DataType::*;
+
/// Serialize a schema in IPC format
-fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
+pub(crate) fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
let mut fbb = FlatBufferBuilder::new();
let mut fields = vec![];
@@ -73,6 +75,47 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
fbb
}
+pub(crate) fn schema_to_fb_offset<'a: 'b, 'b>(
+ mut fbb: &'a mut FlatBufferBuilder,
+ schema: &Schema,
+) -> WIPOffset<ipc::Schema<'b>> {
+ let mut fields = vec![];
+ for field in schema.fields() {
+ let fb_field_name = fbb.create_string(field.name().as_str());
+ let (ipc_type_type, ipc_type, ipc_children) =
+ get_fb_field_type(field.data_type(), &mut fbb);
+ let mut field_builder = ipc::FieldBuilder::new(&mut fbb);
+ field_builder.add_name(fb_field_name);
+ field_builder.add_type_type(ipc_type_type);
+ field_builder.add_nullable(field.is_nullable());
+ match ipc_children {
+ None => {}
+ Some(children) => field_builder.add_children(children),
+ };
+ field_builder.add_type_(ipc_type);
+ fields.push(field_builder.finish());
+ }
+
+ let mut custom_metadata = vec![];
+ for (k, v) in schema.metadata() {
+ let fb_key_name = fbb.create_string(k.as_str());
+ let fb_val_name = fbb.create_string(v.as_str());
+
+ let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb);
+ kv_builder.add_key(fb_key_name);
+ kv_builder.add_value(fb_val_name);
+ custom_metadata.push(kv_builder.finish());
+ }
+
+ let fb_field_list = fbb.create_vector(&fields);
+ let fb_metadata_list = fbb.create_vector(&custom_metadata);
+
+ let mut builder = ipc::SchemaBuilder::new(&mut fbb);
+ builder.add_fields(fb_field_list);
+ builder.add_custom_metadata(fb_metadata_list);
+ builder.finish()
+}
+
/// Convert an IPC Field to Arrow Field
impl<'a> From<ipc::Field<'a>> for Field {
fn from(field: ipc::Field) -> Field {
@@ -85,7 +128,7 @@ impl<'a> From<ipc::Field<'a>> for Field {
}
/// Deserialize a Schema table from IPC format to Schema data type
-pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
+pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema {
let mut fields: Vec<Field> = vec![];
let c_fields = fb.fields().unwrap();
let len = c_fields.len();
@@ -110,7 +153,7 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
}
/// Get the Arrow data type from the flatbuffer Field table
-fn get_data_type(field: ipc::Field) -> DataType {
+pub(crate) fn get_data_type(field: ipc::Field) -> DataType {
match field.type_type() {
ipc::Type::Bool => DataType::Boolean,
ipc::Type::Int => {
@@ -236,7 +279,7 @@ fn get_data_type(field: ipc::Field) -> DataType {
}
/// Get the IPC type of a data type
-fn get_fb_field_type<'a: 'b, 'b>(
+pub(crate) fn get_fb_field_type<'a: 'b, 'b>(
data_type: &DataType,
mut fbb: &mut FlatBufferBuilder<'a>,
) -> (
@@ -244,14 +287,20 @@ fn get_fb_field_type<'a: 'b, 'b>(
WIPOffset<UnionWIPOffset>,
Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>,
) {
- use DataType::*;
+ // some IPC implementations expect an empty list for child data, instead of a null value.
+ // An empty field list is thus returned for primitive types
+ let empty_fields: Vec<WIPOffset<ipc::Field>> = vec![];
match data_type {
- Boolean => (
- ipc::Type::Bool,
- ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(),
- None,
- ),
+ Boolean => {
+ let children = fbb.create_vector(&empty_fields[..]);
+ (
+ ipc::Type::Bool,
+ ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(),
+ Some(children),
+ )
+ }
UInt8 | UInt16 | UInt32 | UInt64 => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::IntBuilder::new(&mut fbb);
builder.add_is_signed(false);
match data_type {
@@ -261,9 +310,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
UInt64 => builder.add_bitWidth(64),
_ => {}
};
- (ipc::Type::Int, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Int,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Int8 | Int16 | Int32 | Int64 => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::IntBuilder::new(&mut fbb);
builder.add_is_signed(true);
match data_type {
@@ -273,9 +327,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
Int64 => builder.add_bitWidth(64),
_ => {}
};
- (ipc::Type::Int, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Int,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Float16 | Float32 | Float64 => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::FloatingPointBuilder::new(&mut fbb);
match data_type {
Float16 => builder.add_precision(ipc::Precision::HALF),
@@ -286,30 +345,57 @@ fn get_fb_field_type<'a: 'b, 'b>(
(
ipc::Type::FloatingPoint,
builder.finish().as_union_value(),
- None,
+ Some(children),
+ )
+ }
+ Binary => {
+ let children = fbb.create_vector(&empty_fields[..]);
+ (
+ ipc::Type::Binary,
+ ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(),
+ Some(children),
+ )
+ }
+ Utf8 => {
+ let children = fbb.create_vector(&empty_fields[..]);
+ (
+ ipc::Type::Utf8,
+ ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(),
+ Some(children),
+ )
+ }
+ FixedSizeBinary(len) => {
+ let children = fbb.create_vector(&empty_fields[..]);
+ let mut builder = ipc::FixedSizeBinaryBuilder::new(&mut fbb);
+ builder.add_byteWidth(*len as i32);
+ (
+ ipc::Type::FixedSizeBinary,
+ builder.finish().as_union_value(),
+ Some(children),
)
}
- Binary => (
- ipc::Type::Binary,
- ipc::BinaryBuilder::new(&mut fbb).finish().as_union_value(),
- None,
- ),
- Utf8 => (
- ipc::Type::Utf8,
- ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(),
- None,
- ),
Date32(_) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::DateBuilder::new(&mut fbb);
builder.add_unit(ipc::DateUnit::DAY);
- (ipc::Type::Date, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Date,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Date64(_) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::DateBuilder::new(&mut fbb);
builder.add_unit(ipc::DateUnit::MILLISECOND);
- (ipc::Type::Date, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Date,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Time32(unit) | Time64(unit) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::TimeBuilder::new(&mut fbb);
match unit {
TimeUnit::Second => {
@@ -329,9 +415,14 @@ fn get_fb_field_type<'a: 'b, 'b>(
builder.add_unit(ipc::TimeUnit::NANOSECOND);
}
}
- (ipc::Type::Time, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Time,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Timestamp(unit, tz) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let tz = tz.clone().unwrap_or(Arc::new(String::new()));
let tz_str = fbb.create_string(tz.as_str());
let mut builder = ipc::TimestampBuilder::new(&mut fbb);
@@ -348,19 +439,25 @@ fn get_fb_field_type<'a: 'b, 'b>(
(
ipc::Type::Timestamp,
builder.finish().as_union_value(),
- None,
+ Some(children),
)
}
Interval(unit) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::IntervalBuilder::new(&mut fbb);
let interval_unit = match unit {
IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH,
IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME,
};
builder.add_unit(interval_unit);
- (ipc::Type::Interval, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Interval,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
Duration(unit) => {
+ let children = fbb.create_vector(&empty_fields[..]);
let mut builder = ipc::DurationBuilder::new(&mut fbb);
let time_unit = match unit {
TimeUnit::Second => ipc::TimeUnit::SECOND,
@@ -369,7 +466,11 @@ fn get_fb_field_type<'a: 'b, 'b>(
TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
};
builder.add_unit(time_unit);
- (ipc::Type::Duration, builder.finish().as_union_value(), None)
+ (
+ ipc::Type::Duration,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
}
List(ref list_type) => {
let inner_types = get_fb_field_type(list_type, &mut fbb);
@@ -392,6 +493,29 @@ fn get_fb_field_type<'a: 'b, 'b>(
Some(children),
)
}
+ FixedSizeList((ref list_type, len)) => {
+ let inner_types = get_fb_field_type(list_type, &mut fbb);
+ let child = ipc::Field::create(
+ &mut fbb,
+ &ipc::FieldArgs {
+ name: None,
+ nullable: false,
+ type_type: inner_types.0,
+ type_: Some(inner_types.1),
+ dictionary: None,
+ children: inner_types.2,
+ custom_metadata: None,
+ },
+ );
+ let children = fbb.create_vector(&[child]);
+ let mut builder = ipc::FixedSizeListBuilder::new(&mut fbb);
+ builder.add_listSize(*len as i32);
+ (
+ ipc::Type::FixedSizeList,
+ builder.finish().as_union_value(),
+ Some(children),
+ )
+ }
Struct(fields) => {
// struct's fields are children
let mut children = vec![];
@@ -418,7 +542,6 @@ fn get_fb_field_type<'a: 'b, 'b>(
Some(children),
)
}
- t @ _ => panic!("Unsupported Arrow Data Type {:?}", t),
}
}
@@ -533,4 +656,39 @@ mod tests {
let schema2 = fb_to_schema(ipc);
assert_eq!(schema, schema2);
}
+
+ #[test]
+ fn schema_from_bytes() {
+ // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`.
+ // the schema is: Field("field1", DataType::UInt32, false)
+ let bytes: Vec<u8> = vec![
+ 16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
+ 12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20,
+ 0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0,
+ 0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0,
+ 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
+ 0, 0, 0, 0,
+ ];
+ let ipc = ipc::get_root_as_message(&bytes[..]);
+ let schema = ipc.header_as_schema().unwrap();
+
+ // a message generated from Rust, same as the Python one
+ let bytes: Vec<u8> = vec![
+ 16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0,
+ 0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0,
+ 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0,
+ 11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0,
+ 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
+ 0, 0,
+ ];
+ let ipc2 = ipc::get_root_as_message(&bytes[..]);
+ let schema2 = ipc.header_as_schema().unwrap();
+
+ assert_eq!(schema, schema2);
+ assert_eq!(ipc.version(), ipc2.version());
+ assert_eq!(ipc.header_type(), ipc2.header_type());
+ assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
+ assert!(ipc.custom_metadata().is_none());
+ assert!(ipc2.custom_metadata().is_none());
+ }
}
diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs
index 8c3bc08..1ef653f 100644
--- a/rust/arrow/src/ipc/mod.rs
+++ b/rust/arrow/src/ipc/mod.rs
@@ -17,6 +17,7 @@
pub mod convert;
pub mod reader;
+pub mod writer;
pub mod gen;
@@ -25,3 +26,5 @@ pub use self::gen::Message::*;
pub use self::gen::Schema::*;
pub use self::gen::SparseTensor::*;
pub use self::gen::Tensor::*;
+
+static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs
index bad6319..ebfac96 100644
--- a/rust/arrow/src/ipc/reader.rs
+++ b/rust/arrow/src/ipc/reader.rs
@@ -32,8 +32,6 @@ use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;
-static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
-
/// Read a buffer based on offset and length
fn read_buffer(buf: &ipc::Buffer, a_data: &Vec<u8>) -> Buffer {
let start_offset = buf.offset() as usize;
@@ -404,14 +402,14 @@ impl<R: Read + Seek> FileReader<R> {
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
- if magic_buffer != ARROW_MAGIC {
+ if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
- if magic_buffer != ARROW_MAGIC {
+ if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct footer".to_string(),
));
@@ -499,8 +497,9 @@ impl<R: Read + Seek> FileReader<R> {
read_record_batch(&buf, batch, self.schema())
}
- _ => unimplemented!(
- "reading types other than record batches not yet supported"
+ t @ _ => unimplemented!(
+ "reading type: {:?}, other than record batches not yet supported",
+ t
),
}
} else {
diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs
new file mode 100644
index 0000000..0800bd6
--- /dev/null
+++ b/rust/arrow/src/ipc/writer.rs
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Arrow IPC File and Stream Writers
+//!
+//! The `FileWriter` and `StreamWriter` have similar interfaces,
+//! however the `FileWriter` expects a reader that supports `Seek`ing
+
+use std::io::{BufWriter, Write};
+
+use flatbuffers::FlatBufferBuilder;
+
+use crate::array::ArrayDataRef;
+use crate::buffer::{Buffer, MutableBuffer};
+use crate::datatypes::*;
+use crate::error::{ArrowError, Result};
+use crate::ipc;
+use crate::record_batch::RecordBatch;
+
+pub struct FileWriter<W: Write> {
+ /// The object to write to
+ writer: BufWriter<W>,
+ /// A reference to the schema, used in validating record batches
+ schema: Schema,
+ /// The number of bytes written for the header (up to schema)
+ header_bytes: usize,
+ /// The number of bytes between each block of bytes, as an offset for random access
+ block_offsets: usize,
+ /// Dictionary blocks that will be written as part of the IPC footer
+ dictionary_blocks: Vec<ipc::Block>,
+ /// Record blocks that will be written as part of the IPC footer
+ record_blocks: Vec<ipc::Block>,
+ /// Whether the writer footer has been written, and the writer is finished
+ finished: bool,
+}
+
+impl<W: Write> FileWriter<W> {
+ /// Try create a new writer, with the schema written as part of the header
+ pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+ let mut writer = BufWriter::new(writer);
+ // write magic to header
+ writer.write(&super::ARROW_MAGIC[..])?;
+ // create an 8-byte boudnary after the header
+ writer.write(&[0, 0])?;
+ // write the schema, set the written bytes to the schema + header
+ let written = write_schema(&mut writer, schema)? + 8;
+ Ok(Self {
+ writer,
+ schema: schema.clone(),
+ header_bytes: written,
+ block_offsets: written,
+ dictionary_blocks: vec![],
+ record_blocks: vec![],
+ finished: false,
+ })
+ }
+
+ /// Write a record batch to the file
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ if self.finished {
+ return Err(ArrowError::IoError(
+ "Cannot write record batch to file writer as it is closed".to_string(),
+ ));
+ }
+ let (meta, data) = write_record_batch(&mut self.writer, batch)?;
+ // add a record block for the footer
+ self.record_blocks.push(ipc::Block::new(
+ self.block_offsets as i64,
+ (meta as i32) + 4,
+ data as i64,
+ ));
+ self.block_offsets += meta + data;
+ Ok(())
+ }
+
+ /// write footer and closing tag, then mark the writer as done
+ pub fn finish(&mut self) -> Result<()> {
+ let mut fbb = FlatBufferBuilder::new();
+ let dictionaries = fbb.create_vector(&self.dictionary_blocks);
+ let record_batches = fbb.create_vector(&self.record_blocks);
+ // TODO: this is duplicated as we otherwise mutably borrow twice
+ let schema = {
+ let mut fields = vec![];
+ for field in self.schema.fields() {
+ let fb_field_name = fbb.create_string(field.name().as_str());
+ let (ipc_type_type, ipc_type, ipc_children) =
+ ipc::convert::get_fb_field_type(field.data_type(), &mut fbb);
+ let mut field_builder = ipc::FieldBuilder::new(&mut fbb);
+ field_builder.add_name(fb_field_name);
+ field_builder.add_type_type(ipc_type_type);
+ field_builder.add_nullable(field.is_nullable());
+ match ipc_children {
+ None => {}
+ Some(children) => field_builder.add_children(children),
+ };
+ field_builder.add_type_(ipc_type);
+ fields.push(field_builder.finish());
+ }
+
+ let mut custom_metadata = vec![];
+ for (k, v) in self.schema.metadata() {
+ let fb_key_name = fbb.create_string(k.as_str());
+ let fb_val_name = fbb.create_string(v.as_str());
+
+ let mut kv_builder = ipc::KeyValueBuilder::new(&mut fbb);
+ kv_builder.add_key(fb_key_name);
+ kv_builder.add_value(fb_val_name);
+ custom_metadata.push(kv_builder.finish());
+ }
+
+ let fb_field_list = fbb.create_vector(&fields);
+ let fb_metadata_list = fbb.create_vector(&custom_metadata);
+
+ let root = {
+ let mut builder = ipc::SchemaBuilder::new(&mut fbb);
+ builder.add_fields(fb_field_list);
+ builder.add_custom_metadata(fb_metadata_list);
+ builder.finish()
+ };
+ root
+ };
+ let root = {
+ let mut footer_builder = ipc::FooterBuilder::new(&mut fbb);
+ footer_builder.add_version(ipc::MetadataVersion::V4);
+ footer_builder.add_schema(schema);
+ footer_builder.add_dictionaries(dictionaries);
+ footer_builder.add_recordBatches(record_batches);
+ footer_builder.finish()
+ };
+ fbb.finish(root, None);
+ write_padded_data(&mut self.writer, fbb.finished_data(), WriteDataType::Footer)?;
+ self.writer.write(&super::ARROW_MAGIC)?;
+ self.writer.flush()?;
+ self.finished = true;
+
+ Ok(())
+ }
+}
+
+/// Finish the file if it is not 'finished' when it goes out of scope
+impl<W: Write> Drop for FileWriter<W> {
+ fn drop(&mut self) {
+ if !self.finished {
+ self.finish().unwrap();
+ }
+ }
+}
+
+/// Convert the schema to its IPC representation, and write it to the `writer`
+fn write_schema<R: Write>(writer: &mut BufWriter<R>, schema: &Schema) -> Result<usize> {
+ let mut fbb = FlatBufferBuilder::new();
+ let schema = {
+ let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
+ fb.as_union_value()
+ };
+
+ let mut message = ipc::MessageBuilder::new(&mut fbb);
+ message.add_version(ipc::MetadataVersion::V4);
+ message.add_header_type(ipc::MessageHeader::Schema);
+ message.add_bodyLength(0);
+ message.add_header(schema);
+ // TODO: custom metadata
+ let data = message.finish();
+ fbb.finish(data, None);
+
+ let data = fbb.finished_data();
+ let written = write_padded_data(writer, data, WriteDataType::Header);
+
+ written
+}
+
+/// The message type being written. This determines whether to write the data length or not.
+/// Data length is written before the header, after the footer, and never for the body.
+#[derive(PartialEq)]
+enum WriteDataType {
+ Header,
+ Body,
+ Footer,
+}
+
+/// Write a slice of data to the writer, ensuring that it is padded to 8 bytes
+fn write_padded_data<R: Write>(
+ writer: &mut BufWriter<R>,
+ data: &[u8],
+ data_type: WriteDataType,
+) -> Result<usize> {
+ let len = data.len() as u32;
+ let pad_len = pad_to_8(len) as u32;
+ let total_len = len + pad_len;
+ // write data length
+ if data_type == WriteDataType::Header {
+ writer.write(&total_len.to_le_bytes()[..])?;
+ }
+ // write flatbuffer data
+ writer.write(data)?;
+ if pad_len > 0 {
+ writer.write(&vec![0u8; pad_len as usize][..])?;
+ }
+ if data_type == WriteDataType::Footer {
+ writer.write(&total_len.to_le_bytes()[..])?;
+ }
+ writer.flush()?;
+ Ok(total_len as usize)
+}
+
+/// Write a record batch to the writer
+fn write_record_batch<R: Write>(
+ writer: &mut BufWriter<R>,
+ batch: &RecordBatch,
+) -> Result<(usize, usize)> {
+ let mut fbb = FlatBufferBuilder::new();
+
+ let mut nodes: Vec<ipc::FieldNode> = vec![];
+ let mut buffers: Vec<ipc::Buffer> = vec![];
+ let mut arrow_data: Vec<u8> = vec![];
+ let mut offset = 0;
+ for array in batch.columns() {
+ let array_data = array.data();
+ offset = write_array_data(
+ &array_data,
+ &mut buffers,
+ &mut arrow_data,
+ &mut nodes,
+ offset,
+ array.len(),
+ array.null_count(),
+ );
+ }
+
+ // write data
+ let buffers = fbb.create_vector(&buffers);
+ let nodes = fbb.create_vector(&nodes);
+
+ let root = {
+ let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+ batch_builder.add_length(batch.num_rows() as i64);
+ batch_builder.add_nodes(nodes);
+ batch_builder.add_buffers(buffers);
+ let b = batch_builder.finish();
+ b.as_union_value()
+ };
+ // create an ipc::Message
+ let mut message = ipc::MessageBuilder::new(&mut fbb);
+ message.add_version(ipc::MetadataVersion::V4);
+ message.add_header_type(ipc::MessageHeader::RecordBatch);
+ message.add_bodyLength(arrow_data.len() as i64);
+ message.add_header(root);
+ let root = message.finish();
+ fbb.finish(root, None);
+ let meta_written =
+ write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?;
+ let arrow_data_written =
+ write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?;
+ Ok((meta_written, arrow_data_written))
+}
+
+/// Write array data to a vector of bytes
+fn write_array_data(
+ array_data: &ArrayDataRef,
+ mut buffers: &mut Vec<ipc::Buffer>,
+ mut arrow_data: &mut Vec<u8>,
+ mut nodes: &mut Vec<ipc::FieldNode>,
+ offset: i64,
+ num_rows: usize,
+ null_count: usize,
+) -> i64 {
+ let mut offset = offset;
+ nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
+ // write null buffer if exists
+ let null_buffer = match array_data.null_buffer() {
+ None => {
+ // create a buffer and fill it with valid bits
+ let buffer = MutableBuffer::new(num_rows);
+ let buffer = buffer.with_bitset(num_rows, true);
+ let buffer = buffer.freeze();
+ buffer
+ }
+ Some(buffer) => buffer.clone(),
+ };
+ offset = write_buffer(&null_buffer, &mut buffers, &mut arrow_data, offset);
+
+ array_data.buffers().iter().for_each(|buffer| {
+ offset = write_buffer(buffer, &mut buffers, &mut arrow_data, offset);
+ });
+
+ // recursively write out nested structures
+ array_data.child_data().iter().for_each(|data_ref| {
+ // write the nested data (e.g list data)
+ offset = write_array_data(
+ data_ref,
+ &mut buffers,
+ &mut arrow_data,
+ &mut nodes,
+ offset,
+ data_ref.len(),
+ data_ref.null_count(),
+ );
+ });
+ offset
+}
+
+/// Write a buffer to a vector of bytes, and add its ipc Buffer to a vector
+fn write_buffer(
+ buffer: &Buffer,
+ buffers: &mut Vec<ipc::Buffer>,
+ arrow_data: &mut Vec<u8>,
+ offset: i64,
+) -> i64 {
+ let len = buffer.len();
+ let pad_len = pad_to_8(len as u32);
+ let total_len: i64 = (len + pad_len) as i64;
+ // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
+ buffers.push(ipc::Buffer::new(offset, total_len));
+ arrow_data.extend_from_slice(buffer.data());
+ arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
+ offset + total_len
+}
+
+/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
+fn pad_to_8<'a>(len: u32) -> usize {
+ match len % 8 {
+ 0 => 0 as usize,
+ v => 8 - v as usize,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use flate2::read::GzDecoder;
+
+ use crate::array::*;
+ use crate::datatypes::Field;
+ use crate::ipc::reader::*;
+ use crate::util::integration_util::*;
+ use std::env;
+ use std::fs::File;
+ use std::io::Read;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_write_file() {
+ let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
+ let values: Vec<Option<u32>> = vec![
+ Some(999),
+ None,
+ Some(235),
+ Some(123),
+ None,
+ None,
+ None,
+ None,
+ None,
+ ];
+ let array1 = UInt32Array::from(values);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(array1) as ArrayRef],
+ )
+ .unwrap();
+ {
+ let file = File::create("target/debug/testdata/arrow.arrow_file").unwrap();
+ let mut writer = FileWriter::try_new(file, &schema).unwrap();
+
+ writer.write(&batch).unwrap();
+ // this is inside a block to test the implicit finishing of the file on `Drop`
+ }
+
+ {
+ let file =
+ File::open(format!("target/debug/testdata/{}.arrow_file", "arrow"))
+ .unwrap();
+ let mut reader = FileReader::try_new(file).unwrap();
+ while let Ok(Some(read_batch)) = reader.next() {
+ read_batch
+ .columns()
+ .iter()
+ .zip(batch.columns())
+ .for_each(|(a, b)| {
+ assert_eq!(a.data_type(), b.data_type());
+ assert_eq!(a.len(), b.len());
+ assert_eq!(a.null_count(), b.null_count());
+ });
+ }
+ }
+ // panic!("intentional failure");
+ }
+
+ #[test]
+ fn read_and_rewrite_generated_files() {
+ let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc/integration/0.14.1/{}.arrow_file",
+ testdata, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file).unwrap();
+
+ // read and rewrite the file to a temp location
+ {
+ let file =
+ File::create(format!("target/debug/testdata/{}.arrow_file", path))
+ .unwrap();
+ let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap();
+ while let Ok(Some(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
+ writer.finish().unwrap();
+ }
+
+ let file =
+ File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap();
+ let mut reader = FileReader::try_new(file).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(path);
+ assert!(arrow_json.equals_reader(&mut reader));
+ });
+ }
+ /// Read gzipped JSON file
+ fn read_gzip_json(path: &str) -> ArrowJson {
+ let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
+ let file = File::open(format!(
+ "{}/arrow-ipc/integration/0.14.1/{}.json.gz",
+ testdata, path
+ ))
+ .unwrap();
+ let mut gz = GzDecoder::new(&file);
+ let mut s = String::new();
+ gz.read_to_string(&mut s).unwrap();
+ // convert to Arrow JSON
+ let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
+ arrow_json
+ }
+}