You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/14 07:07:22 UTC
[arrow-rs] branch master updated: support compression for IPC with revamped feature flags (#2369)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5e27d9390 support compression for IPC with revamped feature flags (#2369)
5e27d9390 is described below
commit 5e27d93904660e4f91e94636e6840823c6fb7d4f
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sun Aug 14 03:07:17 2022 -0400
support compression for IPC with revamped feature flags (#2369)
* support compression for IPC
* fix conflict
* edit toml
* fix clippy and format
* format doc
* format code
* add padding for tail of each buffer
* try fix the arrow lz4 and zstd
* add lz4,zstd for compression cfg
* add cfg for ipm_compression
* address comments
* Rework ipc_compression feature flags and plumb through errors
* fixup flags in reader
* Make stub interface
* Compiles without ipc_compression support
* Fix tests
* Clean up writing
* use uniform flag syntax
* fix flags
* Rename for clarity
* fix compilation
* Add ipc_compression tests to IC
* fix: clippy
* merge-confligts
* Add note in doc
* Remove redundant dev dependencies
* improve variable name
* Apply suggestions from code review
Co-authored-by: Kun Liu <li...@apache.org>
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
* improve comment in stub.rs
* Fix for new clippy
* Clean up clippy
* Clean up header writing
* fmt
Co-authored-by: liukun4515 <li...@apache.org>
Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
.github/workflows/arrow.yml | 6 +-
arrow/Cargo.toml | 3 +
arrow/README.md | 3 +-
arrow/src/ipc/compression/codec.rs | 205 ++++++++++++++
arrow/src/ipc/{ => compression}/mod.rs | 30 +-
arrow/src/ipc/compression/stub.rs | 63 +++++
arrow/src/ipc/mod.rs | 2 +
arrow/src/ipc/reader.rs | 199 +++++++++++--
arrow/src/ipc/writer.rs | 490 ++++++++++++++++++++++++++++++---
9 files changed, 911 insertions(+), 90 deletions(-)
diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index 16cd9a9a0..fcc4d2c37 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -51,9 +51,9 @@ jobs:
- name: Test
run: |
cargo test -p arrow
- - name: Test --features=force_validate,prettyprint,ffi
+ - name: Test --features=force_validate,prettyprint,ipc_compression,ffi
run: |
- cargo test -p arrow --features=force_validate,prettyprint,ffi
+ cargo test -p arrow --features=force_validate,prettyprint,ipc_compression,ffi
- name: Run examples
run: |
# Test arrow examples
@@ -172,4 +172,4 @@ jobs:
rustup component add clippy
- name: Run clippy
run: |
- cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi --all-targets -- -D warnings
+ cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression --all-targets -- -D warnings
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index dbc606ad2..bebaadcbc 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -56,6 +56,7 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag
regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
lazy_static = { version = "1.4", default-features = false }
+lz4 = { version = "1.23", default-features = false, optional = true }
packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.6", default-features = false, optional = true }
@@ -66,9 +67,11 @@ pyo3 = { version = "0.16", default-features = false, optional = true }
lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
multiversion = { version = "0.6.1", default-features = false }
bitflags = { version = "1.2.1", default-features = false }
+zstd = { version = "0.11.1", default-features = false, optional = true }
[features]
default = ["csv", "ipc"]
+ipc_compression = ["ipc", "zstd", "lz4"]
csv = ["csv_crate"]
ipc = ["flatbuffers"]
simd = ["packed_simd"]
diff --git a/arrow/README.md b/arrow/README.md
index f7ccb9696..5e20a4253 100644
--- a/arrow/README.md
+++ b/arrow/README.md
@@ -42,7 +42,8 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `arrow` crate provides the following features which may be enabled in your `Cargo.toml`:
- `csv` (default) - support for reading and writing Arrow arrays to/from csv files
-- `ipc` (default) - support for the [arrow-flight](https://crates.io/crates/arrow-flight) IPC and wire format
+- `ipc` (default) - support for reading [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), also used as the wire protocol in [arrow-flight](https://crates.io/crates/arrow-flight)
+- `ipc_compression` - Enables reading and writing compressed IPC streams (also enables `ipc`)
- `prettyprint` - support for formatting record batches as textual columns
- `js` - support for building arrow for WebAssembly / JavaScript
- `simd` - (_Requires Nightly Rust_) Use alternate hand optimized
diff --git a/arrow/src/ipc/compression/codec.rs b/arrow/src/ipc/compression/codec.rs
new file mode 100644
index 000000000..9d870fc22
--- /dev/null
+++ b/arrow/src/ipc/compression/codec.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+use std::io::{Read, Write};
+
+const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
+const LENGTH_OF_PREFIX_DATA: i64 = 8;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+/// Represents compressing a ipc stream using a particular compression algorithm
+pub enum CompressionCodec {
+ Lz4Frame,
+ Zstd,
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+ type Error = ArrowError;
+
+ fn try_from(compression_type: CompressionType) -> Result<Self> {
+ match compression_type {
+ CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
+ CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
+ other_type => Err(ArrowError::NotYetImplemented(format!(
+ "compression type {:?} not supported ",
+ other_type
+ ))),
+ }
+ }
+}
+
+impl CompressionCodec {
+ /// Compresses the data in `input` to `output` and appends the
+ /// data using the specified compression mechanism.
+ ///
+ /// returns the number of bytes written to the stream
+ ///
+ /// Writes this format to output:
+ /// ```text
+ /// [8 bytes]: uncompressed length
+ /// [remaining bytes]: compressed data stream
+ /// ```
+ pub(crate) fn compress_to_vec(
+ &self,
+ input: &[u8],
+ output: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let uncompressed_data_len = input.len();
+ let original_output_len = output.len();
+
+ if input.is_empty() {
+ // empty input, nothing to do
+ } else {
+ // write compressed data directly into the output buffer
+ output.extend_from_slice(&uncompressed_data_len.to_le_bytes());
+ self.compress(input, output)?;
+
+ let compression_len = output.len();
+ if compression_len > uncompressed_data_len {
+ // length of compressed data was larger than
+ // uncompressed data, use the uncompressed data with
+ // length -1 to indicate that we don't compress the
+ // data
+ output.truncate(original_output_len);
+ output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes());
+ output.extend_from_slice(input);
+ }
+ }
+ Ok(output.len() - original_output_len)
+ }
+
+ /// Decompresses the input into a [`Buffer`]
+ ///
+ /// The input should look like:
+ /// ```text
+ /// [8 bytes]: uncompressed length
+ /// [remaining bytes]: compressed data stream
+ /// ```
+ pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result<Buffer> {
+ // read the first 8 bytes to determine if the data is
+ // compressed
+ let decompressed_length = read_uncompressed_size(input);
+ let buffer = if decompressed_length == 0 {
+ // emtpy
+ let empty = Vec::<u8>::new();
+ Buffer::from(empty)
+ } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+ // no compression
+ let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
+ Buffer::from(data)
+ } else {
+ // decompress data using the codec
+ let mut uncompressed_buffer =
+ Vec::with_capacity(decompressed_length as usize);
+ let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
+ self.decompress(input_data, &mut uncompressed_buffer)?;
+ Buffer::from(uncompressed_buffer)
+ };
+ Ok(buffer)
+ }
+
+ /// Compress the data in input buffer and write to output buffer
+ /// using the specified compression
+ fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+ match self {
+ CompressionCodec::Lz4Frame => {
+ let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+ encoder.write_all(input)?;
+ match encoder.finish().1 {
+ Ok(_) => Ok(()),
+ Err(e) => Err(e.into()),
+ }
+ }
+ CompressionCodec::Zstd => {
+ let mut encoder = zstd::Encoder::new(output, 0)?;
+ encoder.write_all(input)?;
+ match encoder.finish() {
+ Ok(_) => Ok(()),
+ Err(e) => Err(e.into()),
+ }
+ }
+ }
+ }
+
+ /// Decompress the data in input buffer and write to output buffer
+ /// using the specified compression
+ fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
+ let result: Result<usize> = match self {
+ CompressionCodec::Lz4Frame => {
+ let mut decoder = lz4::Decoder::new(input)?;
+ match decoder.read_to_end(output) {
+ Ok(size) => Ok(size),
+ Err(e) => Err(e.into()),
+ }
+ }
+ CompressionCodec::Zstd => {
+ let mut decoder = zstd::Decoder::new(input)?;
+ match decoder.read_to_end(output) {
+ Ok(size) => Ok(size),
+ Err(e) => Err(e.into()),
+ }
+ }
+ };
+ result
+ }
+}
+
+/// Get the uncompressed length
+/// Notes:
+/// LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed
+/// 0: indicate that there is no data
+/// positive number: indicate the uncompressed length for the following data
+#[inline]
+fn read_uncompressed_size(buffer: &[u8]) -> i64 {
+ let len_buffer = &buffer[0..8];
+ // 64-bit little-endian signed integer
+ i64::from_le_bytes(len_buffer.try_into().unwrap())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_lz4_compression() {
+ let input_bytes = "hello lz4".as_bytes();
+ let codec: CompressionCodec = CompressionCodec::Lz4Frame;
+ let mut output_bytes: Vec<u8> = Vec::new();
+ codec.compress(input_bytes, &mut output_bytes).unwrap();
+ let mut result_output_bytes: Vec<u8> = Vec::new();
+ codec
+ .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+ .unwrap();
+ assert_eq!(input_bytes, result_output_bytes.as_slice());
+ }
+
+ #[test]
+ fn test_zstd_compression() {
+ let input_bytes = "hello zstd".as_bytes();
+ let codec: CompressionCodec = CompressionCodec::Zstd;
+ let mut output_bytes: Vec<u8> = Vec::new();
+ codec.compress(input_bytes, &mut output_bytes).unwrap();
+ let mut result_output_bytes: Vec<u8> = Vec::new();
+ codec
+ .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+ .unwrap();
+ assert_eq!(input_bytes, result_output_bytes.as_slice());
+ }
+}
diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/compression/mod.rs
similarity index 54%
copy from arrow/src/ipc/mod.rs
copy to arrow/src/ipc/compression/mod.rs
index d5455b454..666fa6d86 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow/src/ipc/compression/mod.rs
@@ -15,26 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-// TODO: (vcq): Protobuf codegen is not generating Debug impls.
-#![allow(missing_debug_implementations)]
+#[cfg(feature = "ipc_compression")]
+mod codec;
+#[cfg(feature = "ipc_compression")]
+pub(crate) use codec::CompressionCodec;
-pub mod convert;
-pub mod reader;
-pub mod writer;
-
-#[allow(clippy::redundant_closure)]
-#[allow(clippy::needless_lifetimes)]
-#[allow(clippy::extra_unused_lifetimes)]
-#[allow(clippy::redundant_static_lifetimes)]
-#[allow(clippy::redundant_field_names)]
-#[allow(non_camel_case_types)]
-pub mod gen;
-
-pub use self::gen::File::*;
-pub use self::gen::Message::*;
-pub use self::gen::Schema::*;
-pub use self::gen::SparseTensor::*;
-pub use self::gen::Tensor::*;
-
-const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
-const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
+#[cfg(not(feature = "ipc_compression"))]
+mod stub;
+#[cfg(not(feature = "ipc_compression"))]
+pub(crate) use stub::CompressionCodec;
diff --git a/arrow/src/ipc/compression/stub.rs b/arrow/src/ipc/compression/stub.rs
new file mode 100644
index 000000000..6240f084b
--- /dev/null
+++ b/arrow/src/ipc/compression/stub.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stubs that implement the same interface as the ipc_compression
+//! codec module, but always errors.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CompressionCodec {}
+
+impl TryFrom<CompressionCodec> for CompressionType {
+ type Error = ArrowError;
+ fn try_from(codec: CompressionCodec) -> Result<Self> {
+ Err(ArrowError::InvalidArgumentError(
+ format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)))
+ }
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+ type Error = ArrowError;
+
+ fn try_from(compression_type: CompressionType) -> Result<Self> {
+ Err(ArrowError::InvalidArgumentError(
+ format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
+ )
+ }
+}
+
+impl CompressionCodec {
+ #[allow(clippy::ptr_arg)]
+ pub(crate) fn compress_to_vec(
+ &self,
+ _input: &[u8],
+ _output: &mut Vec<u8>,
+ ) -> Result<usize> {
+ Err(ArrowError::InvalidArgumentError(
+ "compression not supported because arrow was not compiled with the ipc_compression feature".to_string()
+ ))
+ }
+
+ pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result<Buffer> {
+ Err(ArrowError::InvalidArgumentError(
+ "decompression not supported because arrow was not compiled with the ipc_compression feature".to_string()
+ ))
+ }
+}
diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs
index d5455b454..2b30e7220 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow/src/ipc/mod.rs
@@ -22,6 +22,8 @@ pub mod convert;
pub mod reader;
pub mod writer;
+mod compression;
+
#[allow(clippy::redundant_closure)]
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::extra_unused_lifetimes)]
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index ce44d74a1..393128371 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -21,6 +21,7 @@
//! however the `FileReader` expects a reader that supports `Seek`ing
use std::collections::HashMap;
+use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
@@ -32,15 +33,35 @@ use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
+use crate::ipc::compression::CompressionCodec;
use ipc::CONTINUATION_MARKER;
use DataType::*;
/// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+ buf: &ipc::Buffer,
+ a_data: &[u8],
+ compression_codec: &Option<CompressionCodec>,
+) -> Result<Buffer> {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
- Buffer::from(&buf_data)
+ // corner case: empty buffer
+ if buf_data.is_empty() {
+ return Ok(Buffer::from(buf_data));
+ }
+ match compression_codec {
+ Some(decompressor) => decompressor.decompress_to_buffer(buf_data),
+ None => Ok(Buffer::from(buf_data)),
+ }
}
/// Coordinates reading arrays based on data types.
@@ -61,6 +82,7 @@ fn create_array(
dictionaries_by_id: &HashMap<i64, ArrayRef>,
mut node_index: usize,
mut buffer_index: usize,
+ compression_codec: &Option<CompressionCodec>,
metadata: &ipc::MetadataVersion,
) -> Result<(ArrayRef, usize, usize)> {
use DataType::*;
@@ -72,8 +94,8 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect(),
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?,
);
node_index += 1;
buffer_index += 3;
@@ -85,8 +107,8 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect(),
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?,
);
node_index += 1;
buffer_index += 2;
@@ -96,8 +118,8 @@ fn create_array(
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect();
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?;
node_index += 1;
buffer_index += 2;
let triple = create_array(
@@ -108,6 +130,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
+ compression_codec,
metadata,
)?;
node_index = triple.1;
@@ -119,8 +142,8 @@ fn create_array(
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect();
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?;
node_index += 1;
buffer_index += 1;
let triple = create_array(
@@ -131,6 +154,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
+ compression_codec,
metadata,
)?;
node_index = triple.1;
@@ -140,7 +164,8 @@ fn create_array(
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
- let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
+ let null_buffer: Buffer =
+ read_buffer(&buffers[buffer_index], data, compression_codec)?;
node_index += 1;
buffer_index += 1;
@@ -157,6 +182,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
+ compression_codec,
metadata,
)?;
node_index = triple.1;
@@ -177,8 +203,8 @@ fn create_array(
let index_node = &nodes[node_index];
let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect();
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?;
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::IoError(format!("Field {} does not have dict id", field))
@@ -209,18 +235,20 @@ fn create_array(
// In V4, union types has validity bitmap
// In V5 and later, union types have no validity bitmap
if metadata < &ipc::MetadataVersion::V5 {
- read_buffer(&buffers[buffer_index], data);
+ read_buffer(&buffers[buffer_index], data, compression_codec)?;
buffer_index += 1;
}
let type_ids: Buffer =
- read_buffer(&buffers[buffer_index], data)[..len].into();
+ read_buffer(&buffers[buffer_index], data, compression_codec)?[..len]
+ .into();
buffer_index += 1;
let value_offsets = match mode {
UnionMode::Dense => {
- let buffer = read_buffer(&buffers[buffer_index], data);
+ let buffer =
+ read_buffer(&buffers[buffer_index], data, compression_codec)?;
buffer_index += 1;
Some(buffer[..len * 4].into())
}
@@ -238,6 +266,7 @@ fn create_array(
dictionaries_by_id,
node_index,
buffer_index,
+ compression_codec,
metadata,
)?;
@@ -277,8 +306,8 @@ fn create_array(
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
- .map(|buf| read_buffer(buf, data))
- .collect(),
+ .map(|buf| read_buffer(buf, data, compression_codec))
+ .collect::<Result<_>>()?,
);
node_index += 1;
buffer_index += 2;
@@ -603,6 +632,11 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
+ let batch_compression = batch.compression();
+ let compression_codec: Option<CompressionCodec> = batch_compression
+ .map(|batch_compression| batch_compression.codec().try_into())
+ .transpose()?;
+
// keep track of buffer and node index, the functions that create arrays mutate these
let mut buffer_index = 0;
let mut node_index = 0;
@@ -626,6 +660,7 @@ pub fn read_record_batch(
dictionaries_by_id,
node_index,
buffer_index,
+ &compression_codec,
metadata,
)?;
node_index = triple.1;
@@ -664,6 +699,7 @@ pub fn read_record_batch(
dictionaries_by_id,
node_index,
buffer_index,
+ &compression_codec,
metadata,
)?;
node_index = triple.1;
@@ -761,6 +797,21 @@ pub struct FileReader<R: Read + Seek> {
projection: Option<(Vec<usize>, Schema)>,
}
+impl<R: Read + Seek> fmt::Debug for FileReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
+ f.debug_struct("FileReader<R>")
+ .field("reader", &"BufReader<..>")
+ .field("schema", &self.schema)
+ .field("blocks", &self.blocks)
+ .field("current_block", &self.current_block)
+ .field("total_blocks", &self.total_blocks)
+ .field("dictionaries_by_id", &self.dictionaries_by_id)
+ .field("metadata_version", &self.metadata_version)
+ .field("projection", &self.projection)
+ .finish()
+ }
+}
+
impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
@@ -921,7 +972,6 @@ impl<R: Read + Seek> FileReader<R> {
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
-
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
@@ -1013,6 +1063,18 @@ pub struct StreamReader<R: Read> {
projection: Option<(Vec<usize>, Schema)>,
}
+impl<R: Read> fmt::Debug for StreamReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
+ f.debug_struct("StreamReader<R>")
+ .field("reader", &"BufReader<..>")
+ .field("schema", &self.schema)
+ .field("dictionaries_by_id", &self.dictionaries_by_id)
+ .field("finished", &self.finished)
+ .field("projection", &self.projection)
+ .finish()
+ }
+}
+
impl<R: Read> StreamReader<R> {
/// Try to create a new stream reader
///
@@ -1414,6 +1476,105 @@ mod tests {
});
}
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn read_generated_streams_200() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ // the next batch must be empty
+ assert!(reader.next().is_none());
+ // the stream must indicate that it's finished
+ assert!(reader.is_finished());
+ });
+ }
+
+ #[test]
+ #[cfg(not(feature = "ipc_compression"))]
+ fn read_generated_streams_200_negative() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+
+ // the test is repetitive, thus we can read all supported files at once
+ let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")];
+ cases.iter().for_each(|(path, compression_name)| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+ let err = reader.next().unwrap().unwrap_err();
+ let expected_error = format!(
+ "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature",
+ compression_name
+ );
+ assert_eq!(err.to_string(), expected_error);
+ });
+ }
+
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn read_generated_files_200() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+ }
+
+ #[test]
+ #[cfg(not(feature = "ipc_compression"))]
+ fn read_generated_files_200_negative() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")];
+ cases.iter().for_each(|(path, compression_name)| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ let err = reader.next().unwrap().unwrap_err();
+ let expected_error = format!(
+ "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature",
+ compression_name
+ );
+ assert_eq!(err.to_string(), expected_error);
+ });
+ }
+
fn create_test_projection_schema() -> Schema {
// define field types
let list_data_type =
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index f0942b074..c817cb77c 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -39,6 +39,7 @@ use crate::ipc;
use crate::record_batch::RecordBatch;
use crate::util::bit_util;
+use crate::ipc::compression::CompressionCodec;
use ipc::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the writer
@@ -58,9 +59,30 @@ pub struct IpcWriteOptions {
/// version 2.0.0: V4, with legacy format enabled
/// version 4.0.0: V5
metadata_version: ipc::MetadataVersion,
+ /// Compression, if desired. Only supported when `ipc_compression`
+ /// feature is enabled
+ batch_compression_type: Option<ipc::CompressionType>,
}
impl IpcWriteOptions {
+ /// Configures compression when writing IPC files. Requires the
+ /// `ipc_compression` feature of the crate to be activated.
+ #[cfg(feature = "ipc_compression")]
+ pub fn try_with_compression(
+ mut self,
+ batch_compression_type: Option<ipc::CompressionType>,
+ ) -> Result<Self> {
+ self.batch_compression_type = batch_compression_type;
+
+ if self.batch_compression_type.is_some()
+ && self.metadata_version < ipc::MetadataVersion::V5
+ {
+ return Err(ArrowError::InvalidArgumentError(
+ "Compression only supported in metadata v5 and above".to_string(),
+ ));
+ }
+ Ok(self)
+ }
/// Try create IpcWriteOptions, checking for incompatible settings
pub fn try_new(
alignment: usize,
@@ -82,6 +104,7 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
+ batch_compression_type: None,
}),
ipc::MetadataVersion::V5 => {
if write_legacy_ipc_format {
@@ -94,10 +117,14 @@ impl IpcWriteOptions {
alignment,
write_legacy_ipc_format,
metadata_version,
+ batch_compression_type: None,
})
}
}
- z => panic!("Unsupported ipc::MetadataVersion {:?}", z),
+ z => Err(ArrowError::InvalidArgumentError(format!(
+ "Unsupported ipc::MetadataVersion {:?}",
+ z
+ ))),
}
}
}
@@ -108,6 +135,7 @@ impl Default for IpcWriteOptions {
alignment: 8,
write_legacy_ipc_format: false,
metadata_version: ipc::MetadataVersion::V5,
+ batch_compression_type: None,
}
}
}
@@ -278,7 +306,7 @@ impl IpcDataGenerator {
dict_id,
dict_values,
write_options,
- ));
+ )?);
}
}
_ => self._encode_dictionaries(
@@ -312,7 +340,7 @@ impl IpcDataGenerator {
)?;
}
- let encoded_message = self.record_batch_to_bytes(batch, write_options);
+ let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
Ok((encoded_dictionaries, encoded_message))
}
@@ -322,13 +350,27 @@ impl IpcDataGenerator {
&self,
batch: &RecordBatch,
write_options: &IpcWriteOptions,
- ) -> EncodedData {
+ ) -> Result<EncodedData> {
let mut fbb = FlatBufferBuilder::new();
let mut nodes: Vec<ipc::FieldNode> = vec![];
let mut buffers: Vec<ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
let mut offset = 0;
+
+ // get the type of compression
+ let batch_compression_type = write_options.batch_compression_type;
+
+ let compression = batch_compression_type.map(|batch_compression_type| {
+ let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
+ c.add_method(ipc::BodyCompressionMethod::BUFFER);
+ c.add_codec(batch_compression_type);
+ c.finish()
+ });
+
+ let compression_codec: Option<CompressionCodec> =
+ batch_compression_type.map(TryInto::try_into).transpose()?;
+
for array in batch.columns() {
let array_data = array.data();
offset = write_array_data(
@@ -339,19 +381,26 @@ impl IpcDataGenerator {
offset,
array.len(),
array.null_count(),
+ &compression_codec,
write_options,
- );
+ )?;
}
+ // pad the tail of body data
+ let len = arrow_data.len();
+ let pad_len = pad_to_8(len as u32);
+ arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
// write data
let buffers = fbb.create_vector(&buffers);
let nodes = fbb.create_vector(&nodes);
-
let root = {
let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
batch_builder.add_length(batch.num_rows() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
+ if let Some(c) = compression {
+ batch_builder.add_compression(c);
+ }
let b = batch_builder.finish();
b.as_union_value()
};
@@ -365,10 +414,10 @@ impl IpcDataGenerator {
fbb.finish(root, None);
let finished_data = fbb.finished_data();
- EncodedData {
+ Ok(EncodedData {
ipc_message: finished_data.to_vec(),
arrow_data,
- }
+ })
}
/// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the
@@ -378,13 +427,27 @@ impl IpcDataGenerator {
dict_id: i64,
array_data: &ArrayData,
write_options: &IpcWriteOptions,
- ) -> EncodedData {
+ ) -> Result<EncodedData> {
let mut fbb = FlatBufferBuilder::new();
let mut nodes: Vec<ipc::FieldNode> = vec![];
let mut buffers: Vec<ipc::Buffer> = vec![];
let mut arrow_data: Vec<u8> = vec![];
+ // get the type of compression
+ let batch_compression_type = write_options.batch_compression_type;
+
+ let compression = batch_compression_type.map(|batch_compression_type| {
+ let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
+ c.add_method(ipc::BodyCompressionMethod::BUFFER);
+ c.add_codec(batch_compression_type);
+ c.finish()
+ });
+
+ let compression_codec: Option<CompressionCodec> = batch_compression_type
+ .map(|batch_compression_type| batch_compression_type.try_into())
+ .transpose()?;
+
write_array_data(
array_data,
&mut buffers,
@@ -393,8 +456,14 @@ impl IpcDataGenerator {
0,
array_data.len(),
array_data.null_count(),
+ &compression_codec,
write_options,
- );
+ )?;
+
+ // pad the tail of body data
+ let len = arrow_data.len();
+ let pad_len = pad_to_8(len as u32);
+ arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
// write data
let buffers = fbb.create_vector(&buffers);
@@ -405,6 +474,9 @@ impl IpcDataGenerator {
batch_builder.add_length(array_data.len() as i64);
batch_builder.add_nodes(nodes);
batch_builder.add_buffers(buffers);
+ if let Some(c) = compression {
+ batch_builder.add_compression(c);
+ }
batch_builder.finish()
};
@@ -427,10 +499,10 @@ impl IpcDataGenerator {
fbb.finish(root, None);
let finished_data = fbb.finished_data();
- EncodedData {
+ Ok(EncodedData {
ipc_message: finished_data.to_vec(),
arrow_data,
- }
+ })
}
}
@@ -519,9 +591,10 @@ impl<W: Write> FileWriter<W> {
) -> Result<Self> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
- // write magic to header
+ // write magic to header aligned on 8 byte boundary
+ let header_size = super::ARROW_MAGIC.len() + 2;
+ assert_eq!(header_size, 8);
writer.write_all(&super::ARROW_MAGIC[..])?;
- // create an 8-byte boundary after the header
writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema + header
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
@@ -530,7 +603,7 @@ impl<W: Write> FileWriter<W> {
writer,
write_options,
schema: schema.clone(),
- block_offsets: meta + data + 8,
+ block_offsets: meta + data + header_size,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
@@ -984,8 +1057,9 @@ fn write_array_data(
offset: i64,
num_rows: usize,
null_count: usize,
+ compression_codec: &Option<CompressionCodec>,
write_options: &IpcWriteOptions,
-) -> i64 {
+) -> Result<i64> {
let mut offset = offset;
if !matches!(array_data.data_type(), DataType::Null) {
nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
@@ -1007,7 +1081,13 @@ fn write_array_data(
Some(buffer) => buffer.bit_slice(array_data.offset(), array_data.len()),
};
- offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data, offset);
+ offset = write_buffer(
+ null_buffer.as_slice(),
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
}
let data_type = array_data.data_type();
@@ -1040,18 +1120,36 @@ fn write_array_data(
)
};
- offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset);
+ offset = write_buffer(
+ new_offsets.as_slice(),
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
let total_bytes = get_binary_buffer_len(array_data);
let value_buffer = &array_data.buffers()[1];
let buffer_length = min(total_bytes, value_buffer.len() - byte_offset);
let buffer_slice =
&value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
- offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+ offset = write_buffer(
+ buffer_slice,
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
} else {
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset);
- });
+ for buffer in array_data.buffers() {
+ offset = write_buffer(
+ buffer.as_slice(),
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
+ }
}
} else if DataType::is_numeric(data_type)
|| DataType::is_temporal(data_type)
@@ -1074,19 +1172,32 @@ fn write_array_data(
let buffer_length = min(min_length, buffer.len() - byte_offset);
let buffer_slice =
&buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
- offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+ offset = write_buffer(
+ buffer_slice,
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
} else {
- offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset);
+ offset = write_buffer(
+ buffer.as_slice(),
+ buffers,
+ arrow_data,
+ offset,
+ compression_codec,
+ )?;
}
} else {
- array_data.buffers().iter().for_each(|buffer| {
- offset = write_buffer(buffer, buffers, arrow_data, offset);
- });
+ for buffer in array_data.buffers() {
+ offset =
+ write_buffer(buffer, buffers, arrow_data, offset, compression_codec)?;
+ }
}
if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) {
// recursively write out nested structures
- array_data.child_data().iter().for_each(|data_ref| {
+ for data_ref in array_data.child_data() {
// write the nested data (e.g list data)
offset = write_array_data(
data_ref,
@@ -1096,29 +1207,56 @@ fn write_array_data(
offset,
data_ref.len(),
data_ref.null_count(),
+ compression_codec,
write_options,
- );
- });
+ )?;
+ }
}
- offset
+ Ok(offset)
}
-/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
+/// [`ipc::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
+///
+///
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
fn write_buffer(
- buffer: &[u8],
- buffers: &mut Vec<ipc::Buffer>,
- arrow_data: &mut Vec<u8>,
- offset: i64,
-) -> i64 {
- let len = buffer.len();
- let pad_len = pad_to_8(len as u32);
- let total_len: i64 = (len + pad_len) as i64;
- // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
- buffers.push(ipc::Buffer::new(offset, total_len));
- arrow_data.extend_from_slice(buffer);
- arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
- offset + total_len
+ buffer: &[u8], // input
+ buffers: &mut Vec<ipc::Buffer>, // output buffer descriptors
+ arrow_data: &mut Vec<u8>, // output stream
+ offset: i64, // current output stream offset
+ compression_codec: &Option<CompressionCodec>,
+) -> Result<i64> {
+ let len: i64 = match compression_codec {
+ Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
+ None => {
+ arrow_data.extend_from_slice(buffer);
+ buffer.len()
+ }
+ }
+ .try_into()
+ .map_err(|e| {
+ ArrowError::InvalidArgumentError(format!(
+ "Could not convert compressed size to i64: {}",
+ e
+ ))
+ })?;
+
+ // make new index entry
+ buffers.push(ipc::Buffer::new(offset, len));
+ // padding and make offset 8 bytes aligned
+ let pad_len = pad_to_8(len as u32) as i64;
+ arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]);
+
+ Ok(offset + len + pad_len)
}
/// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
@@ -1143,6 +1281,167 @@ mod tests {
use crate::ipc::reader::*;
use crate::util::integration_util::*;
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn test_write_empty_record_batch_lz4_compression() {
+ let file_name = "arrow_lz4_empty";
+ let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+ let values: Vec<Option<i32>> = vec![];
+ let array = Int32Array::from(values);
+ let record_batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+ .unwrap();
+ {
+ let file =
+ File::create(format!("target/debug/testdata/{}.arrow_file", file_name))
+ .unwrap();
+ let write_option =
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .unwrap();
+
+ let mut writer =
+ FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.finish().unwrap();
+ }
+ {
+ // read file
+ let file =
+ File::open(format!("target/debug/testdata/{}.arrow_file", file_name))
+ .unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+ loop {
+ match reader.next() {
+ Some(Ok(read_batch)) => {
+ read_batch
+ .columns()
+ .iter()
+ .zip(record_batch.columns())
+ .for_each(|(a, b)| {
+ assert_eq!(a.data_type(), b.data_type());
+ assert_eq!(a.len(), b.len());
+ assert_eq!(a.null_count(), b.null_count());
+ });
+ }
+ Some(Err(e)) => {
+ panic!("{}", e);
+ }
+ None => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn test_write_file_with_lz4_compression() {
+ let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+ let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
+ let array = Int32Array::from(values);
+ let record_batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+ .unwrap();
+ {
+ let file =
+ File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap();
+ let write_option =
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .unwrap();
+
+ let mut writer =
+ FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.finish().unwrap();
+ }
+ {
+ // read file
+ let file =
+ File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4"))
+ .unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+ loop {
+ match reader.next() {
+ Some(Ok(read_batch)) => {
+ read_batch
+ .columns()
+ .iter()
+ .zip(record_batch.columns())
+ .for_each(|(a, b)| {
+ assert_eq!(a.data_type(), b.data_type());
+ assert_eq!(a.len(), b.len());
+ assert_eq!(a.null_count(), b.null_count());
+ });
+ }
+ Some(Err(e)) => {
+ panic!("{}", e);
+ }
+ None => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn test_write_file_with_zstd_compression() {
+ let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+ let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
+ let array = Int32Array::from(values);
+ let record_batch =
+ RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+ .unwrap();
+ {
+ let file =
+ File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap();
+ let write_option =
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::ZSTD))
+ .unwrap();
+
+ let mut writer =
+ FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+ writer.write(&record_batch).unwrap();
+ writer.finish().unwrap();
+ }
+ {
+ // read file
+ let file =
+ File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_zstd"))
+ .unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+ loop {
+ match reader.next() {
+ Some(Ok(read_batch)) => {
+ read_batch
+ .columns()
+ .iter()
+ .zip(record_batch.columns())
+ .for_each(|(a, b)| {
+ assert_eq!(a.data_type(), b.data_type());
+ assert_eq!(a.len(), b.len());
+ assert_eq!(a.null_count(), b.null_count());
+ });
+ }
+ Some(Err(e)) => {
+ panic!("{}", e);
+ }
+ None => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
#[test]
fn test_write_file() {
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
@@ -1499,6 +1798,107 @@ mod tests {
});
}
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn read_and_rewrite_compression_files_200() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read and rewrite the file to a temp location
+ {
+ let file = File::create(format!(
+ "target/debug/testdata/{}-{}.arrow_file",
+ version, path
+ ))
+ .unwrap();
+ // write IPC version 5
+ let options =
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .unwrap();
+
+ let mut writer =
+ FileWriter::try_new_with_options(file, &reader.schema(), options)
+ .unwrap();
+ while let Some(Ok(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
+ writer.finish().unwrap();
+ }
+
+ let file = File::open(format!(
+ "target/debug/testdata/{}-{}.arrow_file",
+ version, path
+ ))
+ .unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+ }
+
+ #[test]
+ #[cfg(feature = "ipc_compression")]
+ fn read_and_rewrite_compression_stream_200() {
+ let testdata = crate::util::test_util::arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let reader = StreamReader::try_new(file, None).unwrap();
+
+ // read and rewrite the stream to a temp location
+ {
+ let file = File::create(format!(
+ "target/debug/testdata/{}-{}.stream",
+ version, path
+ ))
+ .unwrap();
+ let options =
+ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::ZSTD))
+ .unwrap();
+
+ let mut writer =
+ StreamWriter::try_new_with_options(file, &reader.schema(), options)
+ .unwrap();
+ reader.for_each(|batch| {
+ writer.write(&batch.unwrap()).unwrap();
+ });
+ writer.finish().unwrap();
+ }
+
+ let file =
+ File::open(format!("target/debug/testdata/{}-{}.stream", version, path))
+ .unwrap();
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+ }
+
/// Read gzipped JSON file
fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
let testdata = crate::util::test_util::arrow_test_data();