You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/06 08:31:27 UTC
[arrow-rs] branch master updated: Split out arrow-ipc (#3022)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new deb64554f Split out arrow-ipc (#3022)
deb64554f is described below
commit deb64554f9d25afa044248293b31ab7c26f0e42f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Nov 6 21:31:22 2022 +1300
Split out arrow-ipc (#3022)
* Split out arrow-ipc
* RAT
* Fix doc
* Tweak required-features
* Clippy
* Fix feature flags
---
.github/workflows/arrow.yml | 5 +
.github/workflows/arrow_flight.yml | 1 +
.github/workflows/dev_pr/labeler.yml | 1 +
.github/workflows/integration.yml | 1 +
.github/workflows/miri.yaml | 1 +
.github/workflows/parquet.yml | 1 +
Cargo.toml | 1 +
arrow-array/src/lib.rs | 2 +-
arrow-array/src/record_batch.rs | 18 ++
arrow-ipc/CONTRIBUTING.md | 37 +++
arrow-ipc/Cargo.toml | 51 ++++
{arrow => arrow-ipc}/regen.sh | 0
.../codec.rs => arrow-ipc/src/compression.rs | 133 ++++++---
{arrow/src/ipc => arrow-ipc/src}/convert.rs | 294 +++++++++----------
{arrow/src/ipc => arrow-ipc/src}/gen/File.rs | 2 +-
{arrow/src/ipc => arrow-ipc/src}/gen/Message.rs | 8 +-
{arrow/src/ipc => arrow-ipc/src}/gen/Schema.rs | 0
.../src/ipc => arrow-ipc/src}/gen/SparseTensor.rs | 4 +-
{arrow/src/ipc => arrow-ipc/src}/gen/Tensor.rs | 2 +-
{arrow/src/ipc => arrow-ipc/src}/gen/mod.rs | 0
arrow/src/ipc/mod.rs => arrow-ipc/src/lib.rs | 2 +
{arrow/src/ipc => arrow-ipc/src}/reader.rs | 160 ++++++-----
{arrow/src/ipc => arrow-ipc/src}/writer.rs | 310 +++++++++------------
arrow/CONTRIBUTING.md | 17 --
arrow/Cargo.toml | 12 +-
arrow/src/ipc/compression/mod.rs | 26 --
arrow/src/ipc/compression/stub.rs | 63 -----
arrow/src/lib.rs | 25 +-
arrow/tests/ipc_integration.rs | 61 ++++
dev/release/README.md | 2 +
30 files changed, 648 insertions(+), 592 deletions(-)
diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index 9ae72dd00..d930086ef 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -33,6 +33,7 @@ on:
- arrow-schema/**
- arrow-select/**
- arrow-integration-test/**
+ - arrow-ipc/**
- .github/**
jobs:
@@ -61,6 +62,8 @@ jobs:
run: cargo test -p arrow-select --all-features
- name: Test arrow-cast with all features
run: cargo test -p arrow-cast --all-features
+ - name: Test arrow-ipc with all features
+ run: cargo test -p arrow-ipc --all-features
- name: Test arrow-integration-test with all features
run: cargo test -p arrow-integration-test --all-features
- name: Test arrow with default features
@@ -169,5 +172,7 @@ jobs:
run: cargo clippy -p arrow-select --all-targets --all-features -- -D warnings
- name: Clippy arrow-cast with all features
run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings
+ - name: Clippy arrow-ipc with all features
+ run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings
- name: Clippy arrow
run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings
diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml
index 9621c9e69..ded4f5a67 100644
--- a/.github/workflows/arrow_flight.yml
+++ b/.github/workflows/arrow_flight.yml
@@ -35,6 +35,7 @@ on:
- arrow-schema/**
- arrow-select/**
- arrow-flight/**
+ - arrow-ipc/**
- .github/**
jobs:
diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml
index 3a0073004..17ebf54de 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -23,6 +23,7 @@ arrow:
- arrow-data/**/*
- arrow-schema/**/*
- arrow-select/**/*
+ - arrow-ipc/**/*
arrow-flight:
- arrow-flight/**/*
diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
index c2c0a79e6..8566230ea 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -31,6 +31,7 @@ on:
- arrow-data/**
- arrow-schema/**
- arrow-select/**
+ - arrow-ipc/**
- arrow-pyarrow-integration-testing/**
- arrow-integration-test/**
- arrow-integration-testing/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index 241b4f0b4..2e85c9dd9 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -31,6 +31,7 @@ on:
- arrow-data/**
- arrow-schema/**
- arrow-select/**
+ - arrow-ipc/**
- .github/**
jobs:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 5a7beadfd..b369ef69b 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -34,6 +34,7 @@ on:
- arrow-data/**
- arrow-schema/**
- arrow-select/**
+ - arrow-ipc/**
- parquet/**
- .github/**
diff --git a/Cargo.toml b/Cargo.toml
index d8fa5b923..0ab4853c6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@ members = [
"arrow-flight",
"arrow-integration-test",
"arrow-integration-testing",
+ "arrow-ipc",
"arrow-schema",
"arrow-select",
"parquet",
diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs
index 5c86978dc..15267d308 100644
--- a/arrow-array/src/lib.rs
+++ b/arrow-array/src/lib.rs
@@ -162,7 +162,7 @@ pub mod array;
pub use array::*;
mod record_batch;
-pub use record_batch::{RecordBatch, RecordBatchOptions};
+pub use record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
mod arithmetic;
pub use arithmetic::ArrowNativeTypeOp;
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index e613a38bb..6f2385fa9 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -22,6 +22,24 @@ use crate::{new_empty_array, Array, ArrayRef, StructArray};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use std::sync::Arc;
+/// Trait for types that can read `RecordBatch`'s.
+pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
+ /// Returns the schema of this `RecordBatchReader`.
+ ///
+ /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
+ /// reader should have the same schema as returned from this method.
+ fn schema(&self) -> SchemaRef;
+
+ /// Reads the next `RecordBatch`.
+ #[deprecated(
+ since = "2.0.0",
+ note = "This method is deprecated in favour of `next` from the trait Iterator."
+ )]
+ fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ self.next().transpose()
+ }
+}
+
/// A two-dimensional batch of column-oriented data with a defined
/// [schema](arrow_schema::Schema).
///
diff --git a/arrow-ipc/CONTRIBUTING.md b/arrow-ipc/CONTRIBUTING.md
new file mode 100644
index 000000000..5e14760f1
--- /dev/null
+++ b/arrow-ipc/CONTRIBUTING.md
@@ -0,0 +1,37 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+## Developer's guide
+
+# IPC
+
+The expected flatc version is 1.12.0+, built from [flatbuffers](https://github.com/google/flatbuffers)
+master at fixed commit ID, by regen.sh.
+
+The IPC flatbuffer code was generated by running this command from the root of the project:
+
+```bash
+./regen.sh
+```
+
+The above script will run the `flatc` compiler and perform some adjustments to the source code:
+
+- Replace `type__` with `type_`
+- Remove `org::apache::arrow::flatbuffers` namespace
+- Add includes to each generated file
diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml
new file mode 100644
index 000000000..52ad5fe2e
--- /dev/null
+++ b/arrow-ipc/Cargo.toml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-ipc"
+version = "26.0.0"
+description = "Support for the Arrow IPC format"
+homepage = "https://github.com/apache/arrow-rs"
+repository = "https://github.com/apache/arrow-rs"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = ["arrow"]
+include = [
+ "benches/*.rs",
+ "src/**/*.rs",
+ "Cargo.toml",
+]
+edition = "2021"
+rust-version = "1.62"
+
+[lib]
+name = "arrow_ipc"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { version = "26.0.0", path = "../arrow-array" }
+arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
+arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-data = { version = "26.0.0", path = "../arrow-data" }
+arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
+flatbuffers = { version = "22.9.2", default-features = false, features = ["thiserror"] }
+lz4 = { version = "1.23", default-features = false, optional = true }
+zstd = { version = "0.11.1", default-features = false, optional = true }
+
+[dev-dependencies]
+tempfile = "3.3"
diff --git a/arrow/regen.sh b/arrow-ipc/regen.sh
similarity index 100%
rename from arrow/regen.sh
rename to arrow-ipc/regen.sh
diff --git a/arrow/src/ipc/compression/codec.rs b/arrow-ipc/src/compression.rs
similarity index 67%
rename from arrow/src/ipc/compression/codec.rs
rename to arrow-ipc/src/compression.rs
index 58ba8cb86..6349ac232 100644
--- a/arrow/src/ipc/compression/codec.rs
+++ b/arrow-ipc/src/compression.rs
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use crate::buffer::Buffer;
-use crate::error::{ArrowError, Result};
-use crate::ipc::CompressionType;
-use std::io::{Read, Write};
+use crate::CompressionType;
+use arrow_buffer::Buffer;
+use arrow_schema::ArrowError;
const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;
@@ -33,7 +32,7 @@ pub enum CompressionCodec {
impl TryFrom<CompressionType> for CompressionCodec {
type Error = ArrowError;
- fn try_from(compression_type: CompressionType) -> Result<Self> {
+ fn try_from(compression_type: CompressionType) -> Result<Self, ArrowError> {
match compression_type {
CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
@@ -60,7 +59,7 @@ impl CompressionCodec {
&self,
input: &[u8],
output: &mut Vec<u8>,
- ) -> Result<usize> {
+ ) -> Result<usize, ArrowError> {
let uncompressed_data_len = input.len();
let original_output_len = output.len();
@@ -92,7 +91,10 @@ impl CompressionCodec {
/// [8 bytes]: uncompressed length
/// [remaining bytes]: compressed data stream
/// ```
- pub(crate) fn decompress_to_buffer(&self, input: &Buffer) -> Result<Buffer> {
+ pub(crate) fn decompress_to_buffer(
+ &self,
+ input: &Buffer,
+ ) -> Result<Buffer, ArrowError> {
// read the first 8 bytes to determine if the data is
// compressed
let decompressed_length = read_uncompressed_size(input);
@@ -115,50 +117,89 @@ impl CompressionCodec {
/// Compress the data in input buffer and write to output buffer
/// using the specified compression
- fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+ fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
match self {
- CompressionCodec::Lz4Frame => {
- let mut encoder = lz4::EncoderBuilder::new().build(output)?;
- encoder.write_all(input)?;
- match encoder.finish().1 {
- Ok(_) => Ok(()),
- Err(e) => Err(e.into()),
- }
- }
- CompressionCodec::Zstd => {
- let mut encoder = zstd::Encoder::new(output, 0)?;
- encoder.write_all(input)?;
- match encoder.finish() {
- Ok(_) => Ok(()),
- Err(e) => Err(e.into()),
- }
- }
+ CompressionCodec::Lz4Frame => compress_lz4(input, output),
+ CompressionCodec::Zstd => compress_zstd(input, output),
}
}
/// Decompress the data in input buffer and write to output buffer
/// using the specified compression
- fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
- let result: Result<usize> = match self {
- CompressionCodec::Lz4Frame => {
- let mut decoder = lz4::Decoder::new(input)?;
- match decoder.read_to_end(output) {
- Ok(size) => Ok(size),
- Err(e) => Err(e.into()),
- }
- }
- CompressionCodec::Zstd => {
- let mut decoder = zstd::Decoder::new(input)?;
- match decoder.read_to_end(output) {
- Ok(size) => Ok(size),
- Err(e) => Err(e.into()),
- }
- }
- };
- result
+ fn decompress(
+ &self,
+ input: &[u8],
+ output: &mut Vec<u8>,
+ ) -> Result<usize, ArrowError> {
+ match self {
+ CompressionCodec::Lz4Frame => decompress_lz4(input, output),
+ CompressionCodec::Zstd => decompress_zstd(input, output),
+ }
}
}
+#[cfg(feature = "lz4")]
+fn compress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
+ use std::io::Write;
+ let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+ encoder.write_all(input)?;
+ encoder.finish().1?;
+ Ok(())
+}
+
+#[cfg(not(feature = "lz4"))]
+#[allow(clippy::ptr_arg)]
+fn compress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError> {
+ Err(ArrowError::InvalidArgumentError(
+ "lz4 IPC compression requires the lz4 feature".to_string(),
+ ))
+}
+
+#[cfg(feature = "lz4")]
+fn decompress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+ use std::io::Read;
+ Ok(lz4::Decoder::new(input)?.read_to_end(output)?)
+}
+
+#[cfg(not(feature = "lz4"))]
+#[allow(clippy::ptr_arg)]
+fn decompress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+ Err(ArrowError::InvalidArgumentError(
+ "lz4 IPC decompression requires the lz4 feature".to_string(),
+ ))
+}
+
+#[cfg(feature = "zstd")]
+fn compress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
+ use std::io::Write;
+ let mut encoder = zstd::Encoder::new(output, 0)?;
+ encoder.write_all(input)?;
+ encoder.finish()?;
+ Ok(())
+}
+
+#[cfg(not(feature = "zstd"))]
+#[allow(clippy::ptr_arg)]
+fn compress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError> {
+ Err(ArrowError::InvalidArgumentError(
+ "zstd IPC compression requires the zstd feature".to_string(),
+ ))
+}
+
+#[cfg(feature = "zstd")]
+fn decompress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+ use std::io::Read;
+ Ok(zstd::Decoder::new(input)?.read_to_end(output)?)
+}
+
+#[cfg(not(feature = "zstd"))]
+#[allow(clippy::ptr_arg)]
+fn decompress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+ Err(ArrowError::InvalidArgumentError(
+ "zstd IPC decompression requires the zstd feature".to_string(),
+ ))
+}
+
/// Get the uncompressed length
/// Notes:
/// LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed
@@ -173,12 +214,11 @@ fn read_uncompressed_size(buffer: &[u8]) -> i64 {
#[cfg(test)]
mod tests {
- use super::*;
-
#[test]
+ #[cfg(feature = "lz4")]
fn test_lz4_compression() {
let input_bytes = "hello lz4".as_bytes();
- let codec: CompressionCodec = CompressionCodec::Lz4Frame;
+ let codec = super::CompressionCodec::Lz4Frame;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
let mut result_output_bytes: Vec<u8> = Vec::new();
@@ -189,9 +229,10 @@ mod tests {
}
#[test]
+ #[cfg(feature = "zstd")]
fn test_zstd_compression() {
let input_bytes = "hello zstd".as_bytes();
- let codec: CompressionCodec = CompressionCodec::Zstd;
+ let codec = super::CompressionCodec::Zstd;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
let mut result_output_bytes: Vec<u8> = Vec::new();
diff --git a/arrow/src/ipc/convert.rs b/arrow-ipc/src/convert.rs
similarity index 81%
rename from arrow/src/ipc/convert.rs
rename to arrow-ipc/src/convert.rs
index 0f5d246bc..8d01c58b6 100644
--- a/arrow/src/ipc/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -17,16 +17,13 @@
//! Utilities for converting between IPC types and native Arrow types
-use crate::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode};
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-
+use arrow_schema::*;
use flatbuffers::{
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset,
};
use std::collections::{BTreeMap, HashMap};
-use crate::ipc::{size_prefixed_root_as_message, CONTINUATION_MARKER};
+use crate::{size_prefixed_root_as_message, CONTINUATION_MARKER};
use DataType::*;
/// Serialize a schema in IPC format
@@ -43,7 +40,7 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
pub fn schema_to_fb_offset<'a>(
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
-) -> WIPOffset<ipc::Schema<'a>> {
+) -> WIPOffset<crate::Schema<'a>> {
let mut fields = vec![];
for field in schema.fields() {
let fb_field = build_field(fbb, field);
@@ -55,7 +52,7 @@ pub fn schema_to_fb_offset<'a>(
let fb_key_name = fbb.create_string(k.as_str());
let fb_val_name = fbb.create_string(v.as_str());
- let mut kv_builder = ipc::KeyValueBuilder::new(fbb);
+ let mut kv_builder = crate::KeyValueBuilder::new(fbb);
kv_builder.add_key(fb_key_name);
kv_builder.add_value(fb_val_name);
custom_metadata.push(kv_builder.finish());
@@ -64,15 +61,15 @@ pub fn schema_to_fb_offset<'a>(
let fb_field_list = fbb.create_vector(&fields);
let fb_metadata_list = fbb.create_vector(&custom_metadata);
- let mut builder = ipc::SchemaBuilder::new(fbb);
+ let mut builder = crate::SchemaBuilder::new(fbb);
builder.add_fields(fb_field_list);
builder.add_custom_metadata(fb_metadata_list);
builder.finish()
}
/// Convert an IPC Field to Arrow Field
-impl<'a> From<ipc::Field<'a>> for Field {
- fn from(field: ipc::Field) -> Field {
+impl<'a> From<crate::Field<'a>> for Field {
+ fn from(field: crate::Field) -> Field {
let arrow_field = if let Some(dictionary) = field.dictionary() {
Field::new_dict(
field.name().unwrap(),
@@ -105,14 +102,14 @@ impl<'a> From<ipc::Field<'a>> for Field {
}
/// Deserialize a Schema table from flat buffer format to Schema data type
-pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
+pub fn fb_to_schema(fb: crate::Schema) -> Schema {
let mut fields: Vec<Field> = vec![];
let c_fields = fb.fields().unwrap();
let len = c_fields.len();
for i in 0..len {
- let c_field: ipc::Field = c_fields.get(i);
+ let c_field: crate::Field = c_fields.get(i);
match c_field.type_type() {
- ipc::Type::Decimal if fb.endianness() == ipc::Endianness::Big => {
+ crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
unimplemented!("Big Endian is not supported for Decimal!")
}
_ => (),
@@ -138,8 +135,8 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
}
/// Try deserialize flat buffer format bytes into a schema
-pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema> {
- if let Ok(ipc) = ipc::root_as_message(bytes) {
+pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
+ if let Ok(ipc) = crate::root_as_message(bytes) {
if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
Ok(schema)
} else {
@@ -155,7 +152,7 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema> {
}
/// Try deserialize the IPC format bytes into a schema
-pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema> {
+pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
// There are two protocol types: https://issues.apache.org/jira/browse/ARROW-6313
// The original protocal is:
// 4 bytes - the byte length of the payload
@@ -200,7 +197,7 @@ pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema> {
}
/// Get the Arrow data type from the flatbuffer Field table
-pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataType {
+pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
if let Some(dictionary) = field.dictionary() {
if may_be_dictionary {
let int = dictionary.indexType().unwrap();
@@ -223,9 +220,9 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
}
match field.type_type() {
- ipc::Type::Null => DataType::Null,
- ipc::Type::Bool => DataType::Boolean,
- ipc::Type::Int => {
+ crate::Type::Null => DataType::Null,
+ crate::Type::Bool => DataType::Boolean,
+ crate::Type::Int => {
let int = field.type_as_int().unwrap();
match (int.bitWidth(), int.is_signed()) {
(8, true) => DataType::Int8,
@@ -242,103 +239,109 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
),
}
}
- ipc::Type::Binary => DataType::Binary,
- ipc::Type::LargeBinary => DataType::LargeBinary,
- ipc::Type::Utf8 => DataType::Utf8,
- ipc::Type::LargeUtf8 => DataType::LargeUtf8,
- ipc::Type::FixedSizeBinary => {
+ crate::Type::Binary => DataType::Binary,
+ crate::Type::LargeBinary => DataType::LargeBinary,
+ crate::Type::Utf8 => DataType::Utf8,
+ crate::Type::LargeUtf8 => DataType::LargeUtf8,
+ crate::Type::FixedSizeBinary => {
let fsb = field.type_as_fixed_size_binary().unwrap();
DataType::FixedSizeBinary(fsb.byteWidth())
}
- ipc::Type::FloatingPoint => {
+ crate::Type::FloatingPoint => {
let float = field.type_as_floating_point().unwrap();
match float.precision() {
- ipc::Precision::HALF => DataType::Float16,
- ipc::Precision::SINGLE => DataType::Float32,
- ipc::Precision::DOUBLE => DataType::Float64,
+ crate::Precision::HALF => DataType::Float16,
+ crate::Precision::SINGLE => DataType::Float32,
+ crate::Precision::DOUBLE => DataType::Float64,
z => panic!("FloatingPoint type with precision of {:?} not supported", z),
}
}
- ipc::Type::Date => {
+ crate::Type::Date => {
let date = field.type_as_date().unwrap();
match date.unit() {
- ipc::DateUnit::DAY => DataType::Date32,
- ipc::DateUnit::MILLISECOND => DataType::Date64,
+ crate::DateUnit::DAY => DataType::Date32,
+ crate::DateUnit::MILLISECOND => DataType::Date64,
z => panic!("Date type with unit of {:?} not supported", z),
}
}
- ipc::Type::Time => {
+ crate::Type::Time => {
let time = field.type_as_time().unwrap();
match (time.bitWidth(), time.unit()) {
- (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
- (32, ipc::TimeUnit::MILLISECOND) => {
+ (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
+ (32, crate::TimeUnit::MILLISECOND) => {
DataType::Time32(TimeUnit::Millisecond)
}
- (64, ipc::TimeUnit::MICROSECOND) => {
+ (64, crate::TimeUnit::MICROSECOND) => {
DataType::Time64(TimeUnit::Microsecond)
}
- (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
+ (64, crate::TimeUnit::NANOSECOND) => {
+ DataType::Time64(TimeUnit::Nanosecond)
+ }
z => panic!(
"Time type with bit width of {} and unit of {:?} not supported",
z.0, z.1
),
}
}
- ipc::Type::Timestamp => {
+ crate::Type::Timestamp => {
let timestamp = field.type_as_timestamp().unwrap();
let timezone: Option<String> = timestamp.timezone().map(|tz| tz.to_string());
match timestamp.unit() {
- ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
- ipc::TimeUnit::MILLISECOND => {
+ crate::TimeUnit::SECOND => {
+ DataType::Timestamp(TimeUnit::Second, timezone)
+ }
+ crate::TimeUnit::MILLISECOND => {
DataType::Timestamp(TimeUnit::Millisecond, timezone)
}
- ipc::TimeUnit::MICROSECOND => {
+ crate::TimeUnit::MICROSECOND => {
DataType::Timestamp(TimeUnit::Microsecond, timezone)
}
- ipc::TimeUnit::NANOSECOND => {
+ crate::TimeUnit::NANOSECOND => {
DataType::Timestamp(TimeUnit::Nanosecond, timezone)
}
z => panic!("Timestamp type with unit of {:?} not supported", z),
}
}
- ipc::Type::Interval => {
+ crate::Type::Interval => {
let interval = field.type_as_interval().unwrap();
match interval.unit() {
- ipc::IntervalUnit::YEAR_MONTH => {
+ crate::IntervalUnit::YEAR_MONTH => {
DataType::Interval(IntervalUnit::YearMonth)
}
- ipc::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
- ipc::IntervalUnit::MONTH_DAY_NANO => {
+ crate::IntervalUnit::DAY_TIME => {
+ DataType::Interval(IntervalUnit::DayTime)
+ }
+ crate::IntervalUnit::MONTH_DAY_NANO => {
DataType::Interval(IntervalUnit::MonthDayNano)
}
z => panic!("Interval type with unit of {:?} unsupported", z),
}
}
- ipc::Type::Duration => {
+ crate::Type::Duration => {
let duration = field.type_as_duration().unwrap();
match duration.unit() {
- ipc::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
- ipc::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
- ipc::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
- ipc::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
+ crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
+ crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
+ crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
+ crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
z => panic!("Duration type with unit of {:?} unsupported", z),
}
}
- ipc::Type::List => {
+ crate::Type::List => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a list to have one child")
}
DataType::List(Box::new(children.get(0).into()))
}
- ipc::Type::LargeList => {
+ crate::Type::LargeList => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a large list to have one child")
}
DataType::LargeList(Box::new(children.get(0).into()))
}
- ipc::Type::FixedSizeList => {
+ crate::Type::FixedSizeList => {
let children = field.children().unwrap();
if children.len() != 1 {
panic!("expect a list to have one child")
@@ -346,7 +349,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
let fsl = field.type_as_fixed_size_list().unwrap();
DataType::FixedSizeList(Box::new(children.get(0).into()), fsl.listSize())
}
- ipc::Type::Struct_ => {
+ crate::Type::Struct_ => {
let mut fields = vec![];
if let Some(children) = field.children() {
for i in 0..children.len() {
@@ -356,7 +359,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
DataType::Struct(fields)
}
- ipc::Type::Map => {
+ crate::Type::Map => {
let map = field.type_as_map().unwrap();
let children = field.children().unwrap();
if children.len() != 1 {
@@ -364,7 +367,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
}
DataType::Map(Box::new(children.get(0).into()), map.keysSorted())
}
- ipc::Type::Decimal => {
+ crate::Type::Decimal => {
let fsb = field.type_as_decimal().unwrap();
let bit_width = fsb.bitWidth();
if bit_width == 128 {
@@ -381,12 +384,12 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
panic!("Unexpected decimal bit width {}", bit_width)
}
}
- ipc::Type::Union => {
+ crate::Type::Union => {
let union = field.type_as_union().unwrap();
let union_mode = match union.mode() {
- ipc::UnionMode::Dense => UnionMode::Dense,
- ipc::UnionMode::Sparse => UnionMode::Sparse,
+ crate::UnionMode::Dense => UnionMode::Dense,
+ crate::UnionMode::Sparse => UnionMode::Sparse,
mode => panic!("Unexpected union mode: {:?}", mode),
};
@@ -409,27 +412,27 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
}
pub(crate) struct FBFieldType<'b> {
- pub(crate) type_type: ipc::Type,
+ pub(crate) type_type: crate::Type,
pub(crate) type_: WIPOffset<UnionWIPOffset>,
- pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>,
+ pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
}
/// Create an IPC Field from an Arrow Field
pub(crate) fn build_field<'a>(
fbb: &mut FlatBufferBuilder<'a>,
field: &Field,
-) -> WIPOffset<ipc::Field<'a>> {
+) -> WIPOffset<crate::Field<'a>> {
// Optional custom metadata.
let mut fb_metadata = None;
if let Some(metadata) = field.metadata() {
if !metadata.is_empty() {
let mut kv_vec = vec![];
for (k, v) in metadata {
- let kv_args = ipc::KeyValueArgs {
+ let kv_args = crate::KeyValueArgs {
key: Some(fbb.create_string(k.as_str())),
value: Some(fbb.create_string(v.as_str())),
};
- let kv_offset = ipc::KeyValue::create(fbb, &kv_args);
+ let kv_offset = crate::KeyValue::create(fbb, &kv_args);
kv_vec.push(kv_offset);
}
fb_metadata = Some(fbb.create_vector(&kv_vec));
@@ -454,7 +457,7 @@ pub(crate) fn build_field<'a>(
None
};
- let mut field_builder = ipc::FieldBuilder::new(fbb);
+ let mut field_builder = crate::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
if let Some(dictionary) = fb_dictionary {
field_builder.add_dictionary(dictionary)
@@ -481,21 +484,21 @@ pub(crate) fn get_fb_field_type<'a>(
) -> FBFieldType<'a> {
// some IPC implementations expect an empty list for child data, instead of a null value.
// An empty field list is thus returned for primitive types
- let empty_fields: Vec<WIPOffset<ipc::Field>> = vec![];
+ let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
match data_type {
Null => FBFieldType {
- type_type: ipc::Type::Null,
- type_: ipc::NullBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::Null,
+ type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Boolean => FBFieldType {
- type_type: ipc::Type::Bool,
- type_: ipc::BoolBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::Bool,
+ type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
UInt8 | UInt16 | UInt32 | UInt64 => {
let children = fbb.create_vector(&empty_fields[..]);
- let mut builder = ipc::IntBuilder::new(fbb);
+ let mut builder = crate::IntBuilder::new(fbb);
builder.add_is_signed(false);
match data_type {
UInt8 => builder.add_bitWidth(8),
@@ -505,14 +508,14 @@ pub(crate) fn get_fb_field_type<'a>(
_ => {}
};
FBFieldType {
- type_type: ipc::Type::Int,
+ type_type: crate::Type::Int,
type_: builder.finish().as_union_value(),
children: Some(children),
}
}
Int8 | Int16 | Int32 | Int64 => {
let children = fbb.create_vector(&empty_fields[..]);
- let mut builder = ipc::IntBuilder::new(fbb);
+ let mut builder = crate::IntBuilder::new(fbb);
builder.add_is_signed(true);
match data_type {
Int8 => builder.add_bitWidth(8),
@@ -522,95 +525,97 @@ pub(crate) fn get_fb_field_type<'a>(
_ => {}
};
FBFieldType {
- type_type: ipc::Type::Int,
+ type_type: crate::Type::Int,
type_: builder.finish().as_union_value(),
children: Some(children),
}
}
Float16 | Float32 | Float64 => {
let children = fbb.create_vector(&empty_fields[..]);
- let mut builder = ipc::FloatingPointBuilder::new(fbb);
+ let mut builder = crate::FloatingPointBuilder::new(fbb);
match data_type {
- Float16 => builder.add_precision(ipc::Precision::HALF),
- Float32 => builder.add_precision(ipc::Precision::SINGLE),
- Float64 => builder.add_precision(ipc::Precision::DOUBLE),
+ Float16 => builder.add_precision(crate::Precision::HALF),
+ Float32 => builder.add_precision(crate::Precision::SINGLE),
+ Float64 => builder.add_precision(crate::Precision::DOUBLE),
_ => {}
};
FBFieldType {
- type_type: ipc::Type::FloatingPoint,
+ type_type: crate::Type::FloatingPoint,
type_: builder.finish().as_union_value(),
children: Some(children),
}
}
Binary => FBFieldType {
- type_type: ipc::Type::Binary,
- type_: ipc::BinaryBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::Binary,
+ type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
LargeBinary => FBFieldType {
- type_type: ipc::Type::LargeBinary,
- type_: ipc::LargeBinaryBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::LargeBinary,
+ type_: crate::LargeBinaryBuilder::new(fbb)
+ .finish()
+ .as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8 => FBFieldType {
- type_type: ipc::Type::Utf8,
- type_: ipc::Utf8Builder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::Utf8,
+ type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
LargeUtf8 => FBFieldType {
- type_type: ipc::Type::LargeUtf8,
- type_: ipc::LargeUtf8Builder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::LargeUtf8,
+ type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
FixedSizeBinary(len) => {
- let mut builder = ipc::FixedSizeBinaryBuilder::new(fbb);
+ let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
builder.add_byteWidth(*len as i32);
FBFieldType {
- type_type: ipc::Type::FixedSizeBinary,
+ type_type: crate::Type::FixedSizeBinary,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Date32 => {
- let mut builder = ipc::DateBuilder::new(fbb);
- builder.add_unit(ipc::DateUnit::DAY);
+ let mut builder = crate::DateBuilder::new(fbb);
+ builder.add_unit(crate::DateUnit::DAY);
FBFieldType {
- type_type: ipc::Type::Date,
+ type_type: crate::Type::Date,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Date64 => {
- let mut builder = ipc::DateBuilder::new(fbb);
- builder.add_unit(ipc::DateUnit::MILLISECOND);
+ let mut builder = crate::DateBuilder::new(fbb);
+ builder.add_unit(crate::DateUnit::MILLISECOND);
FBFieldType {
- type_type: ipc::Type::Date,
+ type_type: crate::Type::Date,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Time32(unit) | Time64(unit) => {
- let mut builder = ipc::TimeBuilder::new(fbb);
+ let mut builder = crate::TimeBuilder::new(fbb);
match unit {
TimeUnit::Second => {
builder.add_bitWidth(32);
- builder.add_unit(ipc::TimeUnit::SECOND);
+ builder.add_unit(crate::TimeUnit::SECOND);
}
TimeUnit::Millisecond => {
builder.add_bitWidth(32);
- builder.add_unit(ipc::TimeUnit::MILLISECOND);
+ builder.add_unit(crate::TimeUnit::MILLISECOND);
}
TimeUnit::Microsecond => {
builder.add_bitWidth(64);
- builder.add_unit(ipc::TimeUnit::MICROSECOND);
+ builder.add_unit(crate::TimeUnit::MICROSECOND);
}
TimeUnit::Nanosecond => {
builder.add_bitWidth(64);
- builder.add_unit(ipc::TimeUnit::NANOSECOND);
+ builder.add_unit(crate::TimeUnit::NANOSECOND);
}
}
FBFieldType {
- type_type: ipc::Type::Time,
+ type_type: crate::Type::Time,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
@@ -618,48 +623,48 @@ pub(crate) fn get_fb_field_type<'a>(
Timestamp(unit, tz) => {
let tz = tz.clone().unwrap_or_default();
let tz_str = fbb.create_string(tz.as_str());
- let mut builder = ipc::TimestampBuilder::new(fbb);
+ let mut builder = crate::TimestampBuilder::new(fbb);
let time_unit = match unit {
- TimeUnit::Second => ipc::TimeUnit::SECOND,
- TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND,
- TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND,
- TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
+ TimeUnit::Second => crate::TimeUnit::SECOND,
+ TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
+ TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
+ TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
};
builder.add_unit(time_unit);
if !tz.is_empty() {
builder.add_timezone(tz_str);
}
FBFieldType {
- type_type: ipc::Type::Timestamp,
+ type_type: crate::Type::Timestamp,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Interval(unit) => {
- let mut builder = ipc::IntervalBuilder::new(fbb);
+ let mut builder = crate::IntervalBuilder::new(fbb);
let interval_unit = match unit {
- IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH,
- IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME,
- IntervalUnit::MonthDayNano => ipc::IntervalUnit::MONTH_DAY_NANO,
+ IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
+ IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
+ IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
};
builder.add_unit(interval_unit);
FBFieldType {
- type_type: ipc::Type::Interval,
+ type_type: crate::Type::Interval,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Duration(unit) => {
- let mut builder = ipc::DurationBuilder::new(fbb);
+ let mut builder = crate::DurationBuilder::new(fbb);
let time_unit = match unit {
- TimeUnit::Second => ipc::TimeUnit::SECOND,
- TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND,
- TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND,
- TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
+ TimeUnit::Second => crate::TimeUnit::SECOND,
+ TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
+ TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
+ TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
};
builder.add_unit(time_unit);
FBFieldType {
- type_type: ipc::Type::Duration,
+ type_type: crate::Type::Duration,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
@@ -667,25 +672,25 @@ pub(crate) fn get_fb_field_type<'a>(
List(ref list_type) => {
let child = build_field(fbb, list_type);
FBFieldType {
- type_type: ipc::Type::List,
- type_: ipc::ListBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::List,
+ type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
}
LargeList(ref list_type) => {
let child = build_field(fbb, list_type);
FBFieldType {
- type_type: ipc::Type::LargeList,
- type_: ipc::LargeListBuilder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::LargeList,
+ type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
}
FixedSizeList(ref list_type, len) => {
let child = build_field(fbb, list_type);
- let mut builder = ipc::FixedSizeListBuilder::new(fbb);
+ let mut builder = crate::FixedSizeListBuilder::new(fbb);
builder.add_listSize(*len as i32);
FBFieldType {
- type_type: ipc::Type::FixedSizeList,
+ type_type: crate::Type::FixedSizeList,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
@@ -697,17 +702,17 @@ pub(crate) fn get_fb_field_type<'a>(
children.push(build_field(fbb, field));
}
FBFieldType {
- type_type: ipc::Type::Struct_,
- type_: ipc::Struct_Builder::new(fbb).finish().as_union_value(),
+ type_type: crate::Type::Struct_,
+ type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&children[..])),
}
}
Map(map_field, keys_sorted) => {
let child = build_field(fbb, map_field);
- let mut field_type = ipc::MapBuilder::new(fbb);
+ let mut field_type = crate::MapBuilder::new(fbb);
field_type.add_keysSorted(*keys_sorted);
FBFieldType {
- type_type: ipc::Type::Map,
+ type_type: crate::Type::Map,
type_: field_type.finish().as_union_value(),
children: Some(fbb.create_vector(&[child])),
}
@@ -719,23 +724,23 @@ pub(crate) fn get_fb_field_type<'a>(
get_fb_field_type(value_type, fbb)
}
Decimal128(precision, scale) => {
- let mut builder = ipc::DecimalBuilder::new(fbb);
+ let mut builder = crate::DecimalBuilder::new(fbb);
builder.add_precision(*precision as i32);
builder.add_scale(*scale as i32);
builder.add_bitWidth(128);
FBFieldType {
- type_type: ipc::Type::Decimal,
+ type_type: crate::Type::Decimal,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
}
Decimal256(precision, scale) => {
- let mut builder = ipc::DecimalBuilder::new(fbb);
+ let mut builder = crate::DecimalBuilder::new(fbb);
builder.add_precision(*precision as i32);
builder.add_scale(*scale as i32);
builder.add_bitWidth(256);
FBFieldType {
- type_type: ipc::Type::Decimal,
+ type_type: crate::Type::Decimal,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
}
@@ -747,18 +752,18 @@ pub(crate) fn get_fb_field_type<'a>(
}
let union_mode = match mode {
- UnionMode::Sparse => ipc::UnionMode::Sparse,
- UnionMode::Dense => ipc::UnionMode::Dense,
+ UnionMode::Sparse => crate::UnionMode::Sparse,
+ UnionMode::Dense => crate::UnionMode::Dense,
};
let fbb_type_ids = fbb
.create_vector(&type_ids.iter().map(|t| *t as i32).collect::<Vec<_>>());
- let mut builder = ipc::UnionBuilder::new(fbb);
+ let mut builder = crate::UnionBuilder::new(fbb);
builder.add_mode(union_mode);
builder.add_typeIds(fbb_type_ids);
FBFieldType {
- type_type: ipc::Type::Union,
+ type_type: crate::Type::Union,
type_: builder.finish().as_union_value(),
children: Some(fbb.create_vector(&children[..])),
}
@@ -772,10 +777,10 @@ pub(crate) fn get_fb_dictionary<'a>(
dict_id: i64,
dict_is_ordered: bool,
fbb: &mut FlatBufferBuilder<'a>,
-) -> WIPOffset<ipc::DictionaryEncoding<'a>> {
+) -> WIPOffset<crate::DictionaryEncoding<'a>> {
// We assume that the dictionary index type (as an integer) has already been
// validated elsewhere, and can safely assume we are dealing with integers
- let mut index_builder = ipc::IntBuilder::new(fbb);
+ let mut index_builder = crate::IntBuilder::new(fbb);
match *index_type {
Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
@@ -793,7 +798,7 @@ pub(crate) fn get_fb_dictionary<'a>(
let index_builder = index_builder.finish();
- let mut builder = ipc::DictionaryEncodingBuilder::new(fbb);
+ let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
builder.add_id(dict_id);
builder.add_indexType(index_builder);
builder.add_isOrdered(dict_is_ordered);
@@ -804,7 +809,6 @@ pub(crate) fn get_fb_dictionary<'a>(
#[cfg(test)]
mod tests {
use super::*;
- use crate::datatypes::{DataType, Field, Schema, UnionMode};
#[test]
fn convert_schema_round_trip() {
@@ -1024,14 +1028,14 @@ mod tests {
let fb = schema_to_fb(&schema);
// read back fields
- let ipc = ipc::root_as_schema(fb.finished_data()).unwrap();
+ let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
let schema2 = fb_to_schema(ipc);
assert_eq!(schema, schema2);
}
#[test]
fn schema_from_bytes() {
- // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`.
+ // bytes of a schema generated from python (0.14.0), saved as an `crate::Message`.
// the schema is: Field("field1", DataType::UInt32, false)
let bytes: Vec<u8> = vec![
16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
@@ -1041,7 +1045,7 @@ mod tests {
4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
0, 0, 0, 0,
];
- let ipc = ipc::root_as_message(&bytes[..]).unwrap();
+ let ipc = crate::root_as_message(&bytes[..]).unwrap();
let schema = ipc.header_as_schema().unwrap();
// a message generated from Rust, same as the Python one
@@ -1053,7 +1057,7 @@ mod tests {
8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
0, 0,
];
- let ipc2 = ipc::root_as_message(&bytes[..]).unwrap();
+ let ipc2 = crate::root_as_message(&bytes[..]).unwrap();
let schema2 = ipc.header_as_schema().unwrap();
assert_eq!(schema, schema2);
diff --git a/arrow/src/ipc/gen/File.rs b/arrow-ipc/src/gen/File.rs
similarity index 99%
rename from arrow/src/ipc/gen/File.rs
rename to arrow-ipc/src/gen/File.rs
index 9aafe910b..0e9427813 100644
--- a/arrow/src/ipc/gen/File.rs
+++ b/arrow-ipc/src/gen/File.rs
@@ -18,7 +18,7 @@
#![allow(dead_code)]
#![allow(unused_imports)]
-use crate::ipc::gen::Schema::*;
+use crate::gen::Schema::*;
use flatbuffers::EndianScalar;
use std::{cmp::Ordering, mem};
// automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/Message.rs b/arrow-ipc/src/gen/Message.rs
similarity index 99%
rename from arrow/src/ipc/gen/Message.rs
rename to arrow-ipc/src/gen/Message.rs
index d4b3a57f1..2b9f79766 100644
--- a/arrow/src/ipc/gen/Message.rs
+++ b/arrow-ipc/src/gen/Message.rs
@@ -18,9 +18,9 @@
#![allow(dead_code)]
#![allow(unused_imports)]
-use crate::ipc::gen::Schema::*;
-use crate::ipc::gen::SparseTensor::*;
-use crate::ipc::gen::Tensor::*;
+use crate::gen::Schema::*;
+use crate::gen::SparseTensor::*;
+use crate::gen::Tensor::*;
use flatbuffers::EndianScalar;
use std::{cmp::Ordering, mem};
// automatically generated by the FlatBuffers compiler, do not modify
@@ -340,7 +340,7 @@ pub struct MessageHeaderUnionTableOffset {}
/// Metadata about a field at some level of a nested type tree (but not
/// its children).
///
-/// For example, a List<Int16> with values `[[1, 2, 3], null, [4], [5, 6], null]`
+/// For example, a `List<Int16>` with values `[[1, 2, 3], null, [4], [5, 6], null]`
/// would have {length: 5, null_count: 2} for its List node, and {length: 6,
/// null_count: 0} for its Int16 node, as separate FieldNode structs
// struct FieldNode, aligned to 8
diff --git a/arrow/src/ipc/gen/Schema.rs b/arrow-ipc/src/gen/Schema.rs
similarity index 100%
rename from arrow/src/ipc/gen/Schema.rs
rename to arrow-ipc/src/gen/Schema.rs
diff --git a/arrow/src/ipc/gen/SparseTensor.rs b/arrow-ipc/src/gen/SparseTensor.rs
similarity index 99%
rename from arrow/src/ipc/gen/SparseTensor.rs
rename to arrow-ipc/src/gen/SparseTensor.rs
index 317831c59..c5e06c30e 100644
--- a/arrow/src/ipc/gen/SparseTensor.rs
+++ b/arrow-ipc/src/gen/SparseTensor.rs
@@ -18,8 +18,8 @@
#![allow(dead_code)]
#![allow(unused_imports)]
-use crate::ipc::gen::Schema::*;
-use crate::ipc::gen::Tensor::*;
+use crate::gen::Schema::*;
+use crate::gen::Tensor::*;
use flatbuffers::EndianScalar;
use std::{cmp::Ordering, mem};
// automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/Tensor.rs b/arrow-ipc/src/gen/Tensor.rs
similarity index 99%
rename from arrow/src/ipc/gen/Tensor.rs
rename to arrow-ipc/src/gen/Tensor.rs
index f22ff23c9..954ebd290 100644
--- a/arrow/src/ipc/gen/Tensor.rs
+++ b/arrow-ipc/src/gen/Tensor.rs
@@ -18,7 +18,7 @@
#![allow(dead_code)]
#![allow(unused_imports)]
-use crate::ipc::gen::Schema::*;
+use crate::gen::Schema::*;
use flatbuffers::EndianScalar;
use std::{cmp::Ordering, mem};
// automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/mod.rs b/arrow-ipc/src/gen/mod.rs
similarity index 100%
rename from arrow/src/ipc/gen/mod.rs
rename to arrow-ipc/src/gen/mod.rs
diff --git a/arrow/src/ipc/mod.rs b/arrow-ipc/src/lib.rs
similarity index 97%
rename from arrow/src/ipc/mod.rs
rename to arrow-ipc/src/lib.rs
index 2b30e7220..38217957d 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow-ipc/src/lib.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+//! Support for the Arrow IPC format
+
// TODO: (vcq): Protobuf codegen is not generating Debug impls.
#![allow(missing_debug_implementations)]
diff --git a/arrow/src/ipc/reader.rs b/arrow-ipc/src/reader.rs
similarity index 93%
rename from arrow/src/ipc/reader.rs
rename to arrow-ipc/src/reader.rs
index 1f2824b34..0165c775d 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -26,16 +26,14 @@ use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
-use crate::array::*;
-use crate::buffer::{Buffer, MutableBuffer};
-use crate::compute::cast;
-use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode};
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
-
-use crate::ipc::compression::CompressionCodec;
-use ipc::CONTINUATION_MARKER;
+use arrow_array::*;
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_cast::cast;
+use arrow_data::ArrayData;
+use arrow_schema::*;
+
+use crate::compression::CompressionCodec;
+use crate::CONTINUATION_MARKER;
use DataType::*;
/// Read a buffer based on offset and length
@@ -48,10 +46,10 @@ use DataType::*;
/// follows is not compressed, which can be useful for cases where
/// compression does not yield appreciable savings.
fn read_buffer(
- buf: &ipc::Buffer,
+ buf: &crate::Buffer,
a_data: &Buffer,
compression_codec: &Option<CompressionCodec>,
-) -> Result<Buffer> {
+) -> Result<Buffer, ArrowError> {
let start_offset = buf.offset() as usize;
let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
// corner case: empty buffer
@@ -72,16 +70,16 @@ fn read_buffer(
/// - cast the 64-bit array to the appropriate data type
#[allow(clippy::too_many_arguments)]
fn create_array(
- nodes: flatbuffers::Vector<'_, ipc::FieldNode>,
+ nodes: flatbuffers::Vector<'_, crate::FieldNode>,
field: &Field,
data: &Buffer,
- buffers: flatbuffers::Vector<'_, ipc::Buffer>,
+ buffers: flatbuffers::Vector<'_, crate::Buffer>,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
mut node_index: usize,
mut buffer_index: usize,
compression_codec: &Option<CompressionCodec>,
- metadata: &ipc::MetadataVersion,
-) -> Result<(ArrayRef, usize, usize)> {
+ metadata: &crate::MetadataVersion,
+) -> Result<(ArrayRef, usize, usize), ArrowError> {
let data_type = field.data_type();
let array = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
@@ -232,7 +230,7 @@ fn create_array(
// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
- if metadata < &ipc::MetadataVersion::V5 {
+ if metadata < &crate::MetadataVersion::V5 {
read_buffer(buffers.get(buffer_index), data, compression_codec)?;
buffer_index += 1;
}
@@ -323,7 +321,7 @@ fn skip_field(
data_type: &DataType,
mut node_index: usize,
mut buffer_index: usize,
-) -> Result<(usize, usize)> {
+) -> Result<(usize, usize), ArrowError> {
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
node_index += 1;
@@ -396,7 +394,7 @@ fn skip_field(
/// Reads the correct number of buffers based on data type and null_count, and creates a
/// primitive array ref
fn create_primitive_array(
- field_node: &ipc::FieldNode,
+ field_node: &crate::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
) -> ArrayRef {
@@ -536,7 +534,7 @@ fn get_aligned_buffer<T>(buffer: &Buffer, length: usize) -> Buffer {
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_list_array(
- field_node: &ipc::FieldNode,
+ field_node: &crate::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
@@ -564,7 +562,7 @@ fn create_list_array(
/// Reads the correct number of buffers based on list type and null_count, and creates a
/// list array ref
fn create_dictionary_array(
- field_node: &ipc::FieldNode,
+ field_node: &crate::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
@@ -583,15 +581,15 @@ fn create_dictionary_array(
}
}
-/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
+/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`
pub fn read_record_batch(
buf: &Buffer,
- batch: ipc::RecordBatch,
+ batch: crate::RecordBatch,
schema: SchemaRef,
dictionaries_by_id: &HashMap<i64, ArrayRef>,
projection: Option<&[usize]>,
- metadata: &ipc::MetadataVersion,
-) -> Result<RecordBatch> {
+ metadata: &crate::MetadataVersion,
+) -> Result<RecordBatch, ArrowError> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
@@ -669,11 +667,11 @@ pub fn read_record_batch(
/// updating the `dictionaries_by_id` with the resulting dictionary
pub fn read_dictionary(
buf: &Buffer,
- batch: ipc::DictionaryBatch,
+ batch: crate::DictionaryBatch,
schema: &Schema,
dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
- metadata: &ipc::MetadataVersion,
-) -> Result<()> {
+ metadata: &crate::MetadataVersion,
+) -> Result<(), ArrowError> {
if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
@@ -732,7 +730,7 @@ pub struct FileReader<R: Read + Seek> {
/// The blocks in the file
///
/// A block indicates the regions in the file to read to get data
- blocks: Vec<ipc::Block>,
+ blocks: Vec<crate::Block>,
/// A counter to keep track of the current block that should be read
current_block: usize,
@@ -746,7 +744,7 @@ pub struct FileReader<R: Read + Seek> {
dictionaries_by_id: HashMap<i64, ArrayRef>,
/// Metadata version
- metadata_version: ipc::MetadataVersion,
+ metadata_version: crate::MetadataVersion,
/// Optional projection and projected_schema
projection: Option<(Vec<usize>, Schema)>,
@@ -772,7 +770,10 @@ impl<R: Read + Seek> FileReader<R> {
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
- pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> {
+ pub fn try_new(
+ reader: R,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Self, ArrowError> {
let mut reader = BufReader::new(reader);
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
@@ -800,7 +801,7 @@ impl<R: Read + Seek> FileReader<R> {
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
- let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| {
+ let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
@@ -813,7 +814,7 @@ impl<R: Read + Seek> FileReader<R> {
let total_blocks = blocks.len();
let ipc_schema = footer.schema().unwrap();
- let schema = ipc::convert::fb_to_schema(ipc_schema);
+ let schema = crate::convert::fb_to_schema(ipc_schema);
// Create an array of optional dictionary value arrays, one per field.
let mut dictionaries_by_id = HashMap::new();
@@ -831,7 +832,7 @@ impl<R: Read + Seek> FileReader<R> {
reader.read_exact(&mut block_data)?;
- let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
+ let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!(
"Unable to get root as message: {:?}",
err
@@ -839,7 +840,7 @@ impl<R: Read + Seek> FileReader<R> {
})?;
match message.header_type() {
- ipc::MessageHeader::DictionaryBatch => {
+ crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
// read the block that makes up the dictionary batch into a buffer
@@ -900,7 +901,7 @@ impl<R: Read + Seek> FileReader<R> {
/// Read a specific record batch
///
/// Sets the current block to the index, allowing random reads
- pub fn set_index(&mut self, index: usize) -> Result<()> {
+ pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
"Cannot set batch to index {} from {} total batches",
@@ -912,7 +913,7 @@ impl<R: Read + Seek> FileReader<R> {
}
}
- fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
+ fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
let block = self.blocks[self.current_block];
self.current_block += 1;
@@ -928,12 +929,12 @@ impl<R: Read + Seek> FileReader<R> {
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
- let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
+ let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
// some old test data's footer metadata is not set, so we account for that
- if self.metadata_version != ipc::MetadataVersion::V1
+ if self.metadata_version != crate::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
@@ -942,10 +943,10 @@ impl<R: Read + Seek> FileReader<R> {
}
match message.header_type() {
- ipc::MessageHeader::Schema => Err(ArrowError::IoError(
+ crate::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
- ipc::MessageHeader::RecordBatch => {
+ crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
@@ -968,7 +969,7 @@ impl<R: Read + Seek> FileReader<R> {
).map(Some)
}
- ipc::MessageHeader::NONE => {
+ crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
@@ -979,7 +980,7 @@ impl<R: Read + Seek> FileReader<R> {
}
impl<R: Read + Seek> Iterator for FileReader<R> {
- type Item = Result<RecordBatch>;
+ type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
// get current block
@@ -1037,7 +1038,10 @@ impl<R: Read> StreamReader<R> {
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
- pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> {
+ pub fn try_new(
+ reader: R,
+ projection: Option<Vec<usize>>,
+ ) -> Result<Self, ArrowError> {
let mut reader = BufReader::new(reader);
// determine metadata length
let mut meta_size: [u8; 4] = [0; 4];
@@ -1054,14 +1058,14 @@ impl<R: Read> StreamReader<R> {
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
- let message = ipc::root_as_message(meta_buffer.as_slice()).map_err(|err| {
+ let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
// message header is a Schema, so read it
- let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
+ let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
})?;
- let schema = ipc::convert::fb_to_schema(ipc_schema);
+ let schema = crate::convert::fb_to_schema(ipc_schema);
// Create an array of optional dictionary value arrays, one per field.
let dictionaries_by_id = HashMap::new();
@@ -1092,7 +1096,7 @@ impl<R: Read> StreamReader<R> {
self.finished
}
- fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
+ fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
if self.finished {
return Ok(None);
}
@@ -1133,15 +1137,15 @@ impl<R: Read> StreamReader<R> {
self.reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
- let message = ipc::root_as_message(vecs).map_err(|err| {
+ let message = crate::root_as_message(vecs).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
match message.header_type() {
- ipc::MessageHeader::Schema => Err(ArrowError::IoError(
+ crate::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
- ipc::MessageHeader::RecordBatch => {
+ crate::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
@@ -1153,7 +1157,7 @@ impl<R: Read> StreamReader<R> {
read_record_batch(&buf.into(), batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some)
}
- ipc::MessageHeader::DictionaryBatch => {
+ crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as dictionary batch".to_string(),
@@ -1170,7 +1174,7 @@ impl<R: Read> StreamReader<R> {
// read the next message until we encounter a RecordBatch
self.maybe_next()
}
- ipc::MessageHeader::NONE => {
+ crate::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
@@ -1181,7 +1185,7 @@ impl<R: Read> StreamReader<R> {
}
impl<R: Read> Iterator for StreamReader<R> {
- type Item = Result<RecordBatch>;
+ type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
@@ -1198,10 +1202,10 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
mod tests {
use super::*;
- use std::fs::File;
-
- use crate::datatypes;
- use crate::datatypes::{ArrowNativeType, Float64Type, Int32Type, Int8Type};
+ use arrow_array::builder::UnionBuilder;
+ use arrow_array::types::*;
+ use arrow_buffer::ArrowNativeType;
+ use arrow_data::ArrayDataBuilder;
fn create_test_projection_schema() -> Schema {
// define field types
@@ -1347,7 +1351,8 @@ mod tests {
// write record batch in IPC format
let mut buf = Vec::new();
{
- let mut writer = ipc::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
+ let mut writer =
+ crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
@@ -1382,15 +1387,18 @@ mod tests {
];
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
// create stream writer
- let file = File::create("target/debug/testdata/float.stream").unwrap();
+ let mut file = tempfile::tempfile().unwrap();
let mut stream_writer =
- crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
+ crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
stream_writer.write(&batch).unwrap();
stream_writer.finish().unwrap();
+ drop(stream_writer);
+
+ file.rewind().unwrap();
+
// read stream back
- let file = File::open("target/debug/testdata/float.stream").unwrap();
- let reader = StreamReader::try_new(file, None).unwrap();
+ let reader = StreamReader::try_new(&mut file, None).unwrap();
reader.for_each(|batch| {
let batch = batch.unwrap();
@@ -1414,7 +1422,7 @@ mod tests {
);
});
- let file = File::open("target/debug/testdata/float.stream").unwrap();
+ file.rewind().unwrap();
// Read with projection
let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
@@ -1430,33 +1438,33 @@ mod tests {
fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer =
- ipc::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
+ crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
let mut reader =
- ipc::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+ crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
reader.next().unwrap().unwrap()
}
fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
let mut buf = Vec::new();
let mut writer =
- ipc::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
+ crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
writer.write(rb).unwrap();
writer.finish().unwrap();
drop(writer);
let mut reader =
- ipc::reader::StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+ crate::reader::StreamReader::try_new(std::io::Cursor::new(buf), None)
+ .unwrap();
reader.next().unwrap().unwrap()
}
#[test]
fn test_roundtrip_nested_dict() {
- let inner: DictionaryArray<datatypes::Int32Type> =
- vec!["a", "b", "a"].into_iter().collect();
+ let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
let array = Arc::new(inner) as ArrayRef;
@@ -1477,11 +1485,11 @@ mod tests {
}
fn check_union_with_builder(mut builder: UnionBuilder) {
- builder.append::<datatypes::Int32Type>("a", 1).unwrap();
- builder.append_null::<datatypes::Int32Type>("a").unwrap();
- builder.append::<datatypes::Float64Type>("c", 3.0).unwrap();
- builder.append::<datatypes::Int32Type>("a", 4).unwrap();
- builder.append::<datatypes::Int64Type>("d", 11).unwrap();
+ builder.append::<Int32Type>("a", 1).unwrap();
+ builder.append_null::<Int32Type>("a").unwrap();
+ builder.append::<Float64Type>("c", 3.0).unwrap();
+ builder.append::<Int32Type>("a", 4).unwrap();
+ builder.append::<Int64Type>("d", 11).unwrap();
let union = builder.build().unwrap();
let schema = Arc::new(Schema::new(vec![Field::new(
@@ -1521,7 +1529,7 @@ mod tests {
let dict = Arc::new(
xs.clone()
.into_iter()
- .collect::<DictionaryArray<datatypes::Int8Type>>(),
+ .collect::<DictionaryArray<Int8Type>>(),
);
let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
let struct_array = StructArray::from(vec![
diff --git a/arrow/src/ipc/writer.rs b/arrow-ipc/src/writer.rs
similarity index 87%
rename from arrow/src/ipc/writer.rs
rename to arrow-ipc/src/writer.rs
index 4f40574ab..44f32f0cb 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -26,21 +26,16 @@ use std::io::{BufWriter, Write};
use flatbuffers::FlatBufferBuilder;
-use crate::array::{
- as_large_list_array, as_list_array, as_map_array, as_struct_array, as_union_array,
- layout, make_array, Array, ArrayData, ArrayRef, BinaryArray, BufferBuilder,
- BufferSpec, FixedSizeListArray, GenericBinaryArray, GenericStringArray,
- LargeBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray,
-};
-use crate::buffer::{Buffer, MutableBuffer};
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-use crate::record_batch::RecordBatch;
-use crate::util::bit_util;
-
-use crate::ipc::compression::CompressionCodec;
-use ipc::CONTINUATION_MARKER;
+use arrow_array::builder::BufferBuilder;
+use arrow_array::cast::*;
+use arrow_array::*;
+use arrow_buffer::bit_util;
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_data::{layout, ArrayData, BufferSpec};
+use arrow_schema::*;
+
+use crate::compression::CompressionCodec;
+use crate::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the writer
#[derive(Debug, Clone)]
@@ -58,24 +53,25 @@ pub struct IpcWriteOptions {
///
/// version 2.0.0: V4, with legacy format enabled
/// version 4.0.0: V5
- metadata_version: ipc::MetadataVersion,
- /// Compression, if desired. Only supported when `ipc_compression`
- /// feature is enabled
- batch_compression_type: Option<ipc::CompressionType>,
+ metadata_version: crate::MetadataVersion,
+ /// Compression, if desired. Will result in a runtime error
+ /// if the corresponding feature is not enabled
+ batch_compression_type: Option<crate::CompressionType>,
}
impl IpcWriteOptions {
- /// Configures compression when writing IPC files. Requires the
- /// `ipc_compression` feature of the crate to be activated.
- #[cfg(feature = "ipc_compression")]
+ /// Configures compression when writing IPC files.
+ ///
+ /// Will result in a runtime error if the corresponding feature
+ /// is not enabled
pub fn try_with_compression(
mut self,
- batch_compression_type: Option<ipc::CompressionType>,
- ) -> Result<Self> {
+ batch_compression_type: Option<crate::CompressionType>,
+ ) -> Result<Self, ArrowError> {
self.batch_compression_type = batch_compression_type;
if self.batch_compression_type.is_some()
- && self.metadata_version < ipc::MetadataVersion::V5
+ && self.metadata_version < crate::MetadataVersion::V5
{
return Err(ArrowError::InvalidArgumentError(
"Compression only supported in metadata v5 and above".to_string(),
@@ -87,26 +83,26 @@ impl IpcWriteOptions {
pub fn try_new(
alignment: usize,
write_legacy_ipc_format: bool,
- metadata_version: ipc::MetadataVersion,
- ) -> Result<Self> {
+ metadata_version: crate::MetadataVersion,
+ ) -> Result<Self, ArrowError> {
if alignment == 0 || alignment % 8 != 0 {
return Err(ArrowError::InvalidArgumentError(
"Alignment should be greater than 0 and be a multiple of 8".to_string(),
));
}
match metadata_version {
- ipc::MetadataVersion::V1
- | ipc::MetadataVersion::V2
- | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
+ crate::MetadataVersion::V1
+ | crate::MetadataVersion::V2
+ | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
"Writing IPC metadata version 3 and lower not supported".to_string(),
)),
- ipc::MetadataVersion::V4 => Ok(Self {
+ crate::MetadataVersion::V4 => Ok(Self {
alignment,
write_legacy_ipc_format,
metadata_version,
batch_compression_type: None,
}),
- ipc::MetadataVersion::V5 => {
+ crate::MetadataVersion::V5 => {
if write_legacy_ipc_format {
Err(ArrowError::InvalidArgumentError(
"Legacy IPC format only supported on metadata version 4"
@@ -122,7 +118,7 @@ impl IpcWriteOptions {
}
}
z => Err(ArrowError::InvalidArgumentError(format!(
- "Unsupported ipc::MetadataVersion {:?}",
+ "Unsupported crate::MetadataVersion {:?}",
z
))),
}
@@ -134,7 +130,7 @@ impl Default for IpcWriteOptions {
Self {
alignment: 64,
write_legacy_ipc_format: false,
- metadata_version: ipc::MetadataVersion::V5,
+ metadata_version: crate::MetadataVersion::V5,
batch_compression_type: None,
}
}
@@ -151,13 +147,13 @@ impl IpcDataGenerator {
) -> EncodedData {
let mut fbb = FlatBufferBuilder::new();
let schema = {
- let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
+ let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
fb.as_union_value()
};
- let mut message = ipc::MessageBuilder::new(&mut fbb);
+ let mut message = crate::MessageBuilder::new(&mut fbb);
message.add_version(write_options.metadata_version);
- message.add_header_type(ipc::MessageHeader::Schema);
+ message.add_header_type(crate::MessageHeader::Schema);
message.add_bodyLength(0);
message.add_header(schema);
// TODO: custom metadata
@@ -177,7 +173,7 @@ impl IpcDataGenerator {
encoded_dictionaries: &mut Vec<EncodedData>,
dictionary_tracker: &mut DictionaryTracker,
write_options: &IpcWriteOptions,
- ) -> Result<()> {
+ ) -> Result<(), ArrowError> {
match column.data_type() {
DataType::Struct(fields) => {
let s = as_struct_array(column);
@@ -281,7 +277,7 @@ impl IpcDataGenerator {
encoded_dictionaries: &mut Vec<EncodedData>,
dictionary_tracker: &mut DictionaryTracker,
write_options: &IpcWriteOptions,
- ) -> Result<()> {
+ ) -> Result<(), ArrowError> {
match column.data_type() {
DataType::Dictionary(_key_type, _value_type) => {
let dict_id = field
@@ -325,7 +321,7 @@ impl IpcDataGenerator {
batch: &RecordBatch,
dictionary_tracker: &mut DictionaryTracker,
write_options: &IpcWriteOptions,
- ) -> Result<(Vec<EncodedData>, EncodedData)> {
+ ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
let schema = batch.schema();
let mut encoded_dictionaries = Vec::with_capacity(schema.all_fields().len());
@@ -344,17 +340,17 @@ impl IpcDataGenerator {
Ok((encoded_dictionaries, encoded_message))
}
- /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the
+ /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
/// other for the batch's data
fn record_batch_to_bytes(
&self,
batch: &RecordBatch,
write_options: &IpcWriteOptions,
- ) -> Result<EncodedData> {
+ ) -> Result<EncodedData, ArrowError> {
let mut fbb = FlatBufferBuilder::new();
- let mut nodes: Vec<ipc::FieldNode> = vec![];
- let mut buffers: Vec<ipc::Buffer> = vec![];
+ let mut nodes: Vec<crate::FieldNode> = vec![];
+ let mut buffers: Vec<crate::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut offset = 0;
@@ -362,8 +358,8 @@ impl IpcDataGenerator {
let batch_compression_type = write_options.batch_compression_type;
let compression = batch_compression_type.map(|batch_compression_type| {
- let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
- c.add_method(ipc::BodyCompressionMethod::BUFFER);
+ let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
+ c.add_method(crate::BodyCompressionMethod::BUFFER);
c.add_codec(batch_compression_type);
c.finish()
});
@@ -394,7 +390,7 @@ impl IpcDataGenerator {
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
let root = {
- let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+ let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
@@ -404,10 +400,10 @@ impl IpcDataGenerator {
let b = batch_builder.finish();
b.as_union_value()
};
- // create an ipc::Message
- let mut message = ipc::MessageBuilder::new(&mut fbb);
+ // create an crate::Message
+ let mut message = crate::MessageBuilder::new(&mut fbb);
message.add_version(write_options.metadata_version);
- message.add_header_type(ipc::MessageHeader::RecordBatch);
+ message.add_header_type(crate::MessageHeader::RecordBatch);
message.add_bodyLength(arrow_data.len() as i64);
message.add_header(root);
let root = message.finish();
@@ -420,26 +416,26 @@ impl IpcDataGenerator {
})
}
- /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the
+ /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
/// other for the data
fn dictionary_batch_to_bytes(
&self,
dict_id: i64,
array_data: &ArrayData,
write_options: &IpcWriteOptions,
- ) -> Result<EncodedData> {
+ ) -> Result<EncodedData, ArrowError> {
let mut fbb = FlatBufferBuilder::new();
- let mut nodes: Vec<ipc::FieldNode> = vec![];
- let mut buffers: Vec<ipc::Buffer> = vec![];
+ let mut nodes: Vec<crate::FieldNode> = vec![];
+ let mut buffers: Vec<crate::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
// get the type of compression
let batch_compression_type = write_options.batch_compression_type;
let compression = batch_compression_type.map(|batch_compression_type| {
- let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
- c.add_method(ipc::BodyCompressionMethod::BUFFER);
+ let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
+ c.add_method(crate::BodyCompressionMethod::BUFFER);
c.add_codec(batch_compression_type);
c.finish()
});
@@ -470,7 +466,7 @@ impl IpcDataGenerator {
let nodes = fbb.create_vector(&nodes);
let root = {
- let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+ let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(array_data.len() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
@@ -481,16 +477,16 @@ impl IpcDataGenerator {
};
let root = {
- let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb);
+ let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
batch_builder.add_id(dict_id);
batch_builder.add_data(root);
batch_builder.finish().as_union_value()
};
let root = {
- let mut message_builder = ipc::MessageBuilder::new(&mut fbb);
+ let mut message_builder = crate::MessageBuilder::new(&mut fbb);
message_builder.add_version(write_options.metadata_version);
- message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch);
+ message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
message_builder.add_bodyLength(arrow_data.len() as i64);
message_builder.add_header(root);
message_builder.finish()
@@ -531,7 +527,11 @@ impl DictionaryTracker {
/// * If the tracker has not been configured to error on replacement or this dictionary
/// has never been seen before, return `Ok(true)` to indicate that the dictionary was just
/// inserted.
- pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool> {
+ pub fn insert(
+ &mut self,
+ dict_id: i64,
+ column: &ArrayRef,
+ ) -> Result<bool, ArrowError> {
let dict_data = column.data();
let dict_values = &dict_data.child_data()[0];
@@ -565,9 +565,9 @@ pub struct FileWriter<W: Write> {
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
/// Dictionary blocks that will be written as part of the IPC footer
- dictionary_blocks: Vec<ipc::Block>,
+ dictionary_blocks: Vec<crate::Block>,
/// Record blocks that will be written as part of the IPC footer
- record_blocks: Vec<ipc::Block>,
+ record_blocks: Vec<crate::Block>,
/// Whether the writer footer has been written, and the writer is finished
finished: bool,
/// Keeps track of dictionaries that have been written
@@ -578,7 +578,7 @@ pub struct FileWriter<W: Write> {
impl<W: Write> FileWriter<W> {
/// Try create a new writer, with the schema written as part of the header
- pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+ pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}
@@ -588,7 +588,7 @@ impl<W: Write> FileWriter<W> {
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
- ) -> Result<Self> {
+ ) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write magic to header aligned on 8 byte boundary
@@ -613,7 +613,7 @@ impl<W: Write> FileWriter<W> {
}
/// Write a record batch to the file
- pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write record batch to file writer as it is closed".to_string(),
@@ -631,7 +631,7 @@ impl<W: Write> FileWriter<W> {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let block =
- ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64);
+ crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
}
@@ -639,7 +639,7 @@ impl<W: Write> FileWriter<W> {
let (meta, data) =
write_message(&mut self.writer, encoded_message, &self.write_options)?;
// add a record block for the footer
- let block = ipc::Block::new(
+ let block = crate::Block::new(
self.block_offsets as i64,
meta as i32, // TODO: is this still applicable?
data as i64,
@@ -650,7 +650,7 @@ impl<W: Write> FileWriter<W> {
}
/// Write footer and closing tag, then mark the writer as done
- pub fn finish(&mut self) -> Result<()> {
+ pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write footer to file writer as it is closed".to_string(),
@@ -663,10 +663,10 @@ impl<W: Write> FileWriter<W> {
let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
let record_batches = fbb.create_vector(&self.record_blocks);
- let schema = ipc::convert::schema_to_fb_offset(&mut fbb, &self.schema);
+ let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
let root = {
- let mut footer_builder = ipc::FooterBuilder::new(&mut fbb);
+ let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
footer_builder.add_version(self.write_options.metadata_version);
footer_builder.add_schema(schema);
footer_builder.add_dictionaries(dictionaries);
@@ -690,7 +690,7 @@ impl<W: Write> FileWriter<W> {
///
/// The buffer is flushed and the FileWriter is finished before returning the
/// writer.
- pub fn into_inner(mut self) -> Result<W> {
+ pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
self.finish()?;
}
@@ -713,7 +713,7 @@ pub struct StreamWriter<W: Write> {
impl<W: Write> StreamWriter<W> {
/// Try create a new writer, with the schema written as part of the header
- pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+ pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}
@@ -722,7 +722,7 @@ impl<W: Write> StreamWriter<W> {
writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
- ) -> Result<Self> {
+ ) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write the schema, set the written bytes to the schema
@@ -738,7 +738,7 @@ impl<W: Write> StreamWriter<W> {
}
/// Write a record batch to the stream
- pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write record batch to stream writer as it is closed".to_string(),
@@ -759,7 +759,7 @@ impl<W: Write> StreamWriter<W> {
}
/// Write continuation bytes, and mark the stream as done
- pub fn finish(&mut self) -> Result<()> {
+ pub fn finish(&mut self) -> Result<(), ArrowError> {
if self.finished {
return Err(ArrowError::IoError(
"Cannot write footer to stream writer as it is closed".to_string(),
@@ -787,10 +787,9 @@ impl<W: Write> StreamWriter<W> {
/// # Example
///
/// ```
- /// # use arrow::datatypes::Schema;
- /// # use arrow::ipc::writer::{StreamWriter, IpcWriteOptions};
- /// # use arrow::ipc::MetadataVersion;
- /// # use arrow::error::ArrowError;
+ /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
+ /// # use arrow_ipc::MetadataVersion;
+ /// # use arrow_schema::{ArrowError, Schema};
/// # fn main() -> Result<(), ArrowError> {
/// // The result we expect from an empty schema
/// let expected = vec![
@@ -815,7 +814,7 @@ impl<W: Write> StreamWriter<W> {
/// # Ok(())
/// # }
/// ```
- pub fn into_inner(mut self) -> Result<W> {
+ pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
self.finish()?;
}
@@ -823,9 +822,9 @@ impl<W: Write> StreamWriter<W> {
}
}
-/// Stores the encoded data, which is an ipc::Message, and optional Arrow data
+/// Stores the encoded data, which is an crate::Message, and optional Arrow data
pub struct EncodedData {
- /// An encoded ipc::Message
+ /// An encoded crate::Message
pub ipc_message: Vec<u8>,
/// Arrow buffers to be written, should be an empty vec for schema messages
pub arrow_data: Vec<u8>,
@@ -835,7 +834,7 @@ pub fn write_message<W: Write>(
mut writer: W,
encoded: EncodedData,
write_options: &IpcWriteOptions,
-) -> Result<(usize, usize)> {
+) -> Result<(usize, usize), ArrowError> {
let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % 8 != 0 {
return Err(ArrowError::MemoryError(
@@ -877,7 +876,7 @@ pub fn write_message<W: Write>(
Ok((aligned_size, body_len))
}
-fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
+fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize, ArrowError> {
let len = data.len() as u32;
let pad_len = pad_to_8(len) as u32;
let total_len = len + pad_len;
@@ -898,17 +897,17 @@ fn write_continuation<W: Write>(
mut writer: W,
write_options: &IpcWriteOptions,
total_len: i32,
-) -> Result<usize> {
+) -> Result<usize, ArrowError> {
let mut written = 8;
// the version of the writer determines whether continuation markers should be added
match write_options.metadata_version {
- ipc::MetadataVersion::V1
- | ipc::MetadataVersion::V2
- | ipc::MetadataVersion::V3 => {
+ crate::MetadataVersion::V1
+ | crate::MetadataVersion::V2
+ | crate::MetadataVersion::V3 => {
unreachable!("Options with the metadata version cannot be created")
}
- ipc::MetadataVersion::V4 => {
+ crate::MetadataVersion::V4 => {
if !write_options.write_legacy_ipc_format {
// v0.15.0 format
writer.write_all(&CONTINUATION_MARKER)?;
@@ -916,12 +915,12 @@ fn write_continuation<W: Write>(
}
writer.write_all(&total_len.to_le_bytes()[..])?;
}
- ipc::MetadataVersion::V5 => {
+ crate::MetadataVersion::V5 => {
// write continuation marker and message length
writer.write_all(&CONTINUATION_MARKER)?;
writer.write_all(&total_len.to_le_bytes()[..])?;
}
- z => panic!("Unsupported ipc::MetadataVersion {:?}", z),
+ z => panic!("Unsupported crate::MetadataVersion {:?}", z),
};
writer.flush()?;
@@ -932,7 +931,7 @@ fn write_continuation<W: Write>(
/// In V4, null types have no validity bitmap
/// In V5 and later, null and union types have no validity bitmap
fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
- if write_options.metadata_version < ipc::MetadataVersion::V5 {
+ if write_options.metadata_version < crate::MetadataVersion::V5 {
!matches!(data_type, DataType::Null)
} else {
!matches!(data_type, DataType::Null | DataType::Union(_, _, _))
@@ -1053,22 +1052,22 @@ fn get_buffer_offset<OffsetSize: OffsetSizeTrait>(array_data: &ArrayData) -> Off
#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
- buffers: &mut Vec<ipc::Buffer>,
+ buffers: &mut Vec<crate::Buffer>,
arrow_data: &mut Vec<u8>,
- nodes: &mut Vec<ipc::FieldNode>,
+ nodes: &mut Vec<crate::FieldNode>,
offset: i64,
num_rows: usize,
null_count: usize,
compression_codec: &Option<CompressionCodec>,
write_options: &IpcWriteOptions,
-) -> Result<i64> {
+) -> Result<i64, ArrowError> {
let mut offset = offset;
if !matches!(array_data.data_type(), DataType::Null) {
- nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
+ nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
} else {
// NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
// where null_count is always 0.
- nodes.push(ipc::FieldNode::new(num_rows as i64, num_rows as i64));
+ nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
}
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
@@ -1219,7 +1218,7 @@ fn write_array_data(
}
/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`ipc::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
+/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
///
///
/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
@@ -1231,12 +1230,12 @@ fn write_array_data(
/// follows is not compressed, which can be useful for cases where
/// compression does not yield appreciable savings.
fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<ipc::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+ buffer: &[u8], // input
+ buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
+ arrow_data: &mut Vec<u8>, // output stream
+ offset: i64, // current output stream offset
compression_codec: &Option<CompressionCodec>,
-) -> Result<i64> {
+) -> Result<i64, ArrowError> {
let len: i64 = match compression_codec {
Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
None => {
@@ -1253,7 +1252,7 @@ fn write_buffer(
})?;
// make new index entry
- buffers.push(ipc::Buffer::new(offset, len));
+ buffers.push(crate::Buffer::new(offset, len));
// padding and make offset 8 bytes aligned
let pad_len = pad_to_8(len as u32) as i64;
arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]);
@@ -1271,18 +1270,18 @@ fn pad_to_8(len: u32) -> usize {
mod tests {
use super::*;
- use std::fs::File;
use std::io::Seek;
use std::sync::Arc;
- use ipc::MetadataVersion;
+ use crate::MetadataVersion;
- use crate::array::*;
- use crate::datatypes::Field;
- use crate::ipc::reader::*;
+ use crate::reader::*;
+ use arrow_array::builder::UnionBuilder;
+ use arrow_array::types::*;
+ use arrow_schema::DataType;
#[test]
- #[cfg(feature = "ipc_compression")]
+ #[cfg(feature = "lz4")]
fn test_write_empty_record_batch_lz4_compression() {
let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
let values: Vec<Option<i32>> = vec![];
@@ -1295,9 +1294,9 @@ mod tests {
{
let write_option =
- IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
.unwrap()
- .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
.unwrap();
let mut writer =
@@ -1335,7 +1334,7 @@ mod tests {
}
#[test]
- #[cfg(feature = "ipc_compression")]
+ #[cfg(feature = "lz4")]
fn test_write_file_with_lz4_compression() {
let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
@@ -1347,9 +1346,9 @@ mod tests {
let mut file = tempfile::tempfile().unwrap();
{
let write_option =
- IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
.unwrap()
- .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
.unwrap();
let mut writer =
@@ -1387,7 +1386,7 @@ mod tests {
}
#[test]
- #[cfg(feature = "ipc_compression")]
+ #[cfg(feature = "zstd")]
fn test_write_file_with_zstd_compression() {
let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
@@ -1398,9 +1397,9 @@ mod tests {
let mut file = tempfile::tempfile().unwrap();
{
let write_option =
- IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
.unwrap()
- .try_with_compression(Some(ipc::CompressionType::ZSTD))
+ .try_with_compression(Some(crate::CompressionType::ZSTD))
.unwrap();
let mut writer =
@@ -1482,7 +1481,7 @@ mod tests {
}
}
- fn write_null_file(options: IpcWriteOptions, suffix: &str) {
+ fn write_null_file(options: IpcWriteOptions) {
let schema = Schema::new(vec![
Field::new("nulls", DataType::Null, true),
Field::new("int32s", DataType::Int32, false),
@@ -1503,18 +1502,18 @@ mod tests {
],
)
.unwrap();
- let file_name = format!("target/debug/testdata/nulls_{}.arrow_file", suffix);
+ let mut file = tempfile::tempfile().unwrap();
{
- let file = File::create(&file_name).unwrap();
let mut writer =
- FileWriter::try_new_with_options(file, &schema, options).unwrap();
+ FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
+
{
- let file = File::open(&file_name).unwrap();
let reader = FileReader::try_new(file, None).unwrap();
reader.for_each(|maybe_batch| {
maybe_batch
@@ -1532,33 +1531,19 @@ mod tests {
}
#[test]
fn test_write_null_file_v4() {
- write_null_file(
- IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(),
- "v4_a8",
- );
- write_null_file(
- IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(),
- "v4_a8l",
- );
+ write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
+ write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
write_null_file(
IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap(),
- "v4_a64",
- );
- write_null_file(
- IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap(),
- "v4_a64l",
);
+ write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
}
#[test]
fn test_write_null_file_v5() {
- write_null_file(
- IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(),
- "v5_a8",
- );
+ write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
write_null_file(
IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap(),
- "v5_a64",
);
}
@@ -1626,45 +1611,6 @@ mod tests {
assert!(dict_tracker.written.contains_key(&2));
}
- #[test]
- fn read_union_017() {
- let testdata = crate::util::test_util::arrow_test_data();
- let data_file = File::open(format!(
- "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
- testdata,
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(data_file, None).unwrap();
-
- let mut file = tempfile::tempfile().unwrap();
- // read and rewrite the stream to a temp location
- {
- let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
- file.rewind().unwrap();
-
- // Compare original file and rewrote file
- let rewrite_reader = StreamReader::try_new(file, None).unwrap();
-
- let data_file = File::open(format!(
- "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
- testdata,
- ))
- .unwrap();
- let reader = StreamReader::try_new(data_file, None).unwrap();
-
- reader.into_iter().zip(rewrite_reader.into_iter()).for_each(
- |(batch1, batch2)| {
- assert_eq!(batch1.unwrap(), batch2.unwrap());
- },
- );
- }
-
fn write_union_file(options: IpcWriteOptions) {
let schema = Schema::new(vec![Field::new(
"union",
@@ -1739,7 +1685,7 @@ mod tests {
fn deserialize(bytes: Vec<u8>) -> RecordBatch {
let mut stream_reader =
- ipc::reader::StreamReader::try_new(std::io::Cursor::new(bytes), None)
+ crate::reader::StreamReader::try_new(std::io::Cursor::new(bytes), None)
.unwrap();
stream_reader.next().unwrap().unwrap()
}
diff --git a/arrow/CONTRIBUTING.md b/arrow/CONTRIBUTING.md
index bbf309d4d..5b84bc2d3 100644
--- a/arrow/CONTRIBUTING.md
+++ b/arrow/CONTRIBUTING.md
@@ -26,23 +26,6 @@ Rust [README.md](../README.md).
Please refer to [lib.rs](src/lib.rs) for an introduction to this
specific crate and its current functionality.
-## IPC
-
-The expected flatc version is 1.12.0+, built from [flatbuffers](https://github.com/google/flatbuffers)
-master at fixed commit ID, by regen.sh.
-
-The IPC flatbuffer code was generated by running this command from the root of the project:
-
-```bash
-./regen.sh
-```
-
-The above script will run the `flatc` compiler and perform some adjustments to the source code:
-
-- Replace `type__` with `type_`
-- Remove `org::apache::arrow::flatbuffers` namespace
-- Add includes to each generated file
-
## Guidelines in usage of `unsafe`
[`unsafe`](https://doc.rust-lang.org/book/ch19-01-unsafe-rust.html) has a high maintenance cost because debugging and testing it is difficult, time consuming, often requires external tools (e.g. `valgrind`), and requires a higher-than-usual attention to details. Undefined behavior is particularly difficult to identify and test, and usage of `unsafe` is the [primary cause of undefined behavior](https://doc.rust-lang.org/reference/behavior-considered-undefined.html) in a program written in Rust.
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 5749f6799..6c30df6bd 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -50,6 +50,7 @@ arrow-data = { version = "26.0.0", path = "../arrow-data" }
arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
arrow-array = { version = "26.0.0", path = "../arrow-array" }
arrow-select = { version = "26.0.0", path = "../arrow-select" }
+arrow-ipc = { version = "26.0.0", path = "../arrow-ipc", optional = true }
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
indexmap = { version = "1.9", default-features = false, features = ["std"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
@@ -60,25 +61,22 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag
regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
lazy_static = { version = "1.4", default-features = false }
-lz4 = { version = "1.23", default-features = false, optional = true }
packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
-flatbuffers = { version = "22.9.2", default-features = false, features = ["thiserror"], optional = true }
comfy-table = { version = "6.0", optional = true, default-features = false }
pyo3 = { version = "0.17", default-features = false, optional = true }
lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
multiversion = { version = "0.6.1", default-features = false }
bitflags = { version = "1.2.1", default-features = false }
-zstd = { version = "0.11.1", default-features = false, optional = true }
[package.metadata.docs.rs]
features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"]
[features]
default = ["csv", "ipc", "json"]
-ipc_compression = ["ipc", "zstd", "lz4"]
+ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"]
csv = ["csv_crate"]
-ipc = ["flatbuffers"]
+ipc = ["arrow-ipc"]
json = ["serde_json"]
simd = ["packed_simd"]
prettyprint = ["comfy-table"]
@@ -265,3 +263,7 @@ required-features = ["test_utils"]
name = "lexsort"
harness = false
required-features = ["test_utils"]
+
+[[test]]
+name = "ipc_integration"
+required-features = ["test_utils", "ipc"]
diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs
deleted file mode 100644
index 666fa6d86..000000000
--- a/arrow/src/ipc/compression/mod.rs
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(feature = "ipc_compression")]
-mod codec;
-#[cfg(feature = "ipc_compression")]
-pub(crate) use codec::CompressionCodec;
-
-#[cfg(not(feature = "ipc_compression"))]
-mod stub;
-#[cfg(not(feature = "ipc_compression"))]
-pub(crate) use stub::CompressionCodec;
diff --git a/arrow/src/ipc/compression/stub.rs b/arrow/src/ipc/compression/stub.rs
deleted file mode 100644
index 6240f084b..000000000
--- a/arrow/src/ipc/compression/stub.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Stubs that implement the same interface as the ipc_compression
-//! codec module, but always errors.
-
-use crate::buffer::Buffer;
-use crate::error::{ArrowError, Result};
-use crate::ipc::CompressionType;
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum CompressionCodec {}
-
-impl TryFrom<CompressionCodec> for CompressionType {
- type Error = ArrowError;
- fn try_from(codec: CompressionCodec) -> Result<Self> {
- Err(ArrowError::InvalidArgumentError(
- format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)))
- }
-}
-
-impl TryFrom<CompressionType> for CompressionCodec {
- type Error = ArrowError;
-
- fn try_from(compression_type: CompressionType) -> Result<Self> {
- Err(ArrowError::InvalidArgumentError(
- format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
- )
- }
-}
-
-impl CompressionCodec {
- #[allow(clippy::ptr_arg)]
- pub(crate) fn compress_to_vec(
- &self,
- _input: &[u8],
- _output: &mut Vec<u8>,
- ) -> Result<usize> {
- Err(ArrowError::InvalidArgumentError(
- "compression not supported because arrow was not compiled with the ipc_compression feature".to_string()
- ))
- }
-
- pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result<Buffer> {
- Err(ArrowError::InvalidArgumentError(
- "decompression not supported because arrow was not compiled with the ipc_compression feature".to_string()
- ))
- }
-}
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index 0081856f3..b2fa30d26 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -314,35 +314,14 @@ pub mod ffi;
#[cfg(feature = "ffi")]
pub mod ffi_stream;
#[cfg(feature = "ipc")]
-pub mod ipc;
+pub use arrow_ipc as ipc;
#[cfg(feature = "serde_json")]
pub mod json;
#[cfg(feature = "pyarrow")]
pub mod pyarrow;
pub mod record_batch {
- pub use arrow_array::{RecordBatch, RecordBatchOptions};
- use arrow_schema::{ArrowError, SchemaRef};
-
- /// Trait for types that can read `RecordBatch`'s.
- pub trait RecordBatchReader:
- Iterator<Item = Result<RecordBatch, ArrowError>>
- {
- /// Returns the schema of this `RecordBatchReader`.
- ///
- /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
- /// reader should have the same schema as returned from this method.
- fn schema(&self) -> SchemaRef;
-
- /// Reads the next `RecordBatch`.
- #[deprecated(
- since = "2.0.0",
- note = "This method is deprecated in favour of `next` from the trait Iterator."
- )]
- fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- self.next().transpose()
- }
- }
+ pub use arrow_array::{RecordBatch, RecordBatchOptions, RecordBatchReader};
}
pub mod row;
pub use arrow_array::temporal_conversions;
diff --git a/arrow/tests/ipc_integration.rs b/arrow/tests/ipc_integration.rs
new file mode 100644
index 000000000..abaa238ba
--- /dev/null
+++ b/arrow/tests/ipc_integration.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_ipc::reader::StreamReader;
+use arrow_ipc::writer::StreamWriter;
+use std::fs::File;
+use std::io::Seek;
+
+#[test]
+fn read_union_017() {
+ let testdata = arrow::util::test_util::arrow_test_data();
+ let data_file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
+ testdata,
+ ))
+ .unwrap();
+
+ let reader = StreamReader::try_new(data_file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+ // read and rewrite the stream to a temp location
+ {
+ let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
+ reader.for_each(|batch| {
+ writer.write(&batch.unwrap()).unwrap();
+ });
+ writer.finish().unwrap();
+ }
+ file.rewind().unwrap();
+
+ // Compare original file and rewrote file
+ let rewrite_reader = StreamReader::try_new(file, None).unwrap();
+
+ let data_file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
+ testdata,
+ ))
+ .unwrap();
+ let reader = StreamReader::try_new(data_file, None).unwrap();
+
+ reader
+ .into_iter()
+ .zip(rewrite_reader.into_iter())
+ .for_each(|(batch1, batch2)| {
+ assert_eq!(batch1.unwrap(), batch2.unwrap());
+ });
+}
diff --git a/dev/release/README.md b/dev/release/README.md
index a12e07f8e..093e1c4c2 100644
--- a/dev/release/README.md
+++ b/dev/release/README.md
@@ -257,6 +257,8 @@ Rust Arrow Crates:
(cd arrow-data && cargo publish)
(cd arrow-array && cargo publish)
(cd arrow-select && cargo publish)
+(cd arrow-cast && cargo publish)
+(cd arrow-ipc && cargo publish)
(cd arrow && cargo publish)
(cd arrow-flight && cargo publish)
(cd parquet && cargo publish)