You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/14 07:07:22 UTC

[arrow-rs] branch master updated: support compression for IPC with revamped feature flags (#2369)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e27d9390 support compression for IPC with revamped feature flags (#2369)
5e27d9390 is described below

commit 5e27d93904660e4f91e94636e6840823c6fb7d4f
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sun Aug 14 03:07:17 2022 -0400

    support compression for IPC with revamped feature flags (#2369)
    
    * support compression for IPC
    
    * fix conflict
    
    * edit toml
    
    * fix clippy and format
    
    * format doc
    
    * format code
    
    * add padding for tail of each buffer
    
    * try fix the arrow lz4 and zstd
    
    * add lz4,zstd for compression cfg
    
    * add cfg for ipm_compression
    
    * address comments
    
    * Rework ipc_compression feature flags and plumb through errors
    
    * fixup flags in reader
    
    * Make stub interface
    
    * Compiles without ipc_compression support
    
    * Fix tests
    
    * Clean up writing
    
    * use uniform flag syntax
    
    * fix flags
    
    * Rename for clarity
    
    * fix compilation
    
    * Add ipc_compression tests to IC
    
    * fix: clippy
    
    * merge-confligts
    
    * Add note in doc
    
    * Remove redundant dev dependencies
    
    * improve variable name
    
    * Apply suggestions from code review
    
    Co-authored-by: Kun Liu <li...@apache.org>
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    * improve comment in stub.rs
    
    * Fix for new clippy
    
    * Clean up clippy
    
    * Clean up header writing
    
    * fmt
    
    Co-authored-by: liukun4515 <li...@apache.org>
    Co-authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 .github/workflows/arrow.yml            |   6 +-
 arrow/Cargo.toml                       |   3 +
 arrow/README.md                        |   3 +-
 arrow/src/ipc/compression/codec.rs     | 205 ++++++++++++++
 arrow/src/ipc/{ => compression}/mod.rs |  30 +-
 arrow/src/ipc/compression/stub.rs      |  63 +++++
 arrow/src/ipc/mod.rs                   |   2 +
 arrow/src/ipc/reader.rs                | 199 +++++++++++--
 arrow/src/ipc/writer.rs                | 490 ++++++++++++++++++++++++++++++---
 9 files changed, 911 insertions(+), 90 deletions(-)

diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index 16cd9a9a0..fcc4d2c37 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -51,9 +51,9 @@ jobs:
       - name: Test
         run: |
           cargo test -p arrow
-      - name: Test --features=force_validate,prettyprint,ffi
+      - name: Test --features=force_validate,prettyprint,ipc_compression,ffi
         run: |
-          cargo test -p arrow --features=force_validate,prettyprint,ffi
+          cargo test -p arrow --features=force_validate,prettyprint,ipc_compression,ffi
       - name: Run examples
         run: |
           # Test arrow examples
@@ -172,4 +172,4 @@ jobs:
           rustup component add clippy
       - name: Run clippy
         run: |
-          cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi --all-targets -- -D warnings
+          cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression --all-targets -- -D warnings
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index dbc606ad2..bebaadcbc 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -56,6 +56,7 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag
 regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
 regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
 lazy_static = { version = "1.4", default-features = false }
+lz4 = { version = "1.23", default-features = false, optional = true }
 packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
 chrono-tz = { version = "0.6", default-features = false, optional = true }
@@ -66,9 +67,11 @@ pyo3 = { version = "0.16", default-features = false, optional = true }
 lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
 multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
+zstd = { version = "0.11.1", default-features = false, optional = true }
 
 [features]
 default = ["csv", "ipc"]
+ipc_compression = ["ipc", "zstd", "lz4"]
 csv = ["csv_crate"]
 ipc = ["flatbuffers"]
 simd = ["packed_simd"]
diff --git a/arrow/README.md b/arrow/README.md
index f7ccb9696..5e20a4253 100644
--- a/arrow/README.md
+++ b/arrow/README.md
@@ -42,7 +42,8 @@ However, for historical reasons, this crate uses versions with major numbers gre
 The `arrow` crate provides the following features which may be enabled in your `Cargo.toml`:
 
 - `csv` (default) - support for reading and writing Arrow arrays to/from csv files
-- `ipc` (default) - support for the [arrow-flight](https://crates.io/crates/arrow-flight) IPC and wire format
+- `ipc` (default) - support for reading [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), also used as the wire protocol in [arrow-flight](https://crates.io/crates/arrow-flight)
+- `ipc_compression`  - Enables reading and writing compressed IPC streams (also enables `ipc`)
 - `prettyprint` - support for formatting record batches as textual columns
 - `js` - support for building arrow for WebAssembly / JavaScript
 - `simd` - (_Requires Nightly Rust_) Use alternate hand optimized
diff --git a/arrow/src/ipc/compression/codec.rs b/arrow/src/ipc/compression/codec.rs
new file mode 100644
index 000000000..9d870fc22
--- /dev/null
+++ b/arrow/src/ipc/compression/codec.rs
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+use std::io::{Read, Write};
+
+const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
+const LENGTH_OF_PREFIX_DATA: i64 = 8;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+/// Represents compressing a ipc stream using a particular compression algorithm
+pub enum CompressionCodec {
+    Lz4Frame,
+    Zstd,
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+    type Error = ArrowError;
+
+    fn try_from(compression_type: CompressionType) -> Result<Self> {
+        match compression_type {
+            CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
+            CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
+            other_type => Err(ArrowError::NotYetImplemented(format!(
+                "compression type {:?} not supported ",
+                other_type
+            ))),
+        }
+    }
+}
+
+impl CompressionCodec {
+    /// Compresses the data in `input` to `output` and appends the
+    /// data using the specified compression mechanism.
+    ///
+    /// returns the number of bytes written to the stream
+    ///
+    /// Writes this format to output:
+    /// ```text
+    /// [8 bytes]:         uncompressed length
+    /// [remaining bytes]: compressed data stream
+    /// ```
+    pub(crate) fn compress_to_vec(
+        &self,
+        input: &[u8],
+        output: &mut Vec<u8>,
+    ) -> Result<usize> {
+        let uncompressed_data_len = input.len();
+        let original_output_len = output.len();
+
+        if input.is_empty() {
+            // empty input, nothing to do
+        } else {
+            // write compressed data directly into the output buffer
+            output.extend_from_slice(&uncompressed_data_len.to_le_bytes());
+            self.compress(input, output)?;
+
+            let compression_len = output.len();
+            if compression_len > uncompressed_data_len {
+                // length of compressed data was larger than
+                // uncompressed data, use the uncompressed data with
+                // length -1 to indicate that we don't compress the
+                // data
+                output.truncate(original_output_len);
+                output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes());
+                output.extend_from_slice(input);
+            }
+        }
+        Ok(output.len() - original_output_len)
+    }
+
+    /// Decompresses the input into a [`Buffer`]
+    ///
+    /// The input should look like:
+    /// ```text
+    /// [8 bytes]:         uncompressed length
+    /// [remaining bytes]: compressed data stream
+    /// ```
+    pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result<Buffer> {
+        // read the first 8 bytes to determine if the data is
+        // compressed
+        let decompressed_length = read_uncompressed_size(input);
+        let buffer = if decompressed_length == 0 {
+            // emtpy
+            let empty = Vec::<u8>::new();
+            Buffer::from(empty)
+        } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
+            // no compression
+            let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
+            Buffer::from(data)
+        } else {
+            // decompress data using the codec
+            let mut uncompressed_buffer =
+                Vec::with_capacity(decompressed_length as usize);
+            let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
+            self.decompress(input_data, &mut uncompressed_buffer)?;
+            Buffer::from(uncompressed_buffer)
+        };
+        Ok(buffer)
+    }
+
+    /// Compress the data in input buffer and write to output buffer
+    /// using the specified compression
+    fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+        match self {
+            CompressionCodec::Lz4Frame => {
+                let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+                encoder.write_all(input)?;
+                match encoder.finish().1 {
+                    Ok(_) => Ok(()),
+                    Err(e) => Err(e.into()),
+                }
+            }
+            CompressionCodec::Zstd => {
+                let mut encoder = zstd::Encoder::new(output, 0)?;
+                encoder.write_all(input)?;
+                match encoder.finish() {
+                    Ok(_) => Ok(()),
+                    Err(e) => Err(e.into()),
+                }
+            }
+        }
+    }
+
+    /// Decompress the data in input buffer and write to output buffer
+    /// using the specified compression
+    fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
+        let result: Result<usize> = match self {
+            CompressionCodec::Lz4Frame => {
+                let mut decoder = lz4::Decoder::new(input)?;
+                match decoder.read_to_end(output) {
+                    Ok(size) => Ok(size),
+                    Err(e) => Err(e.into()),
+                }
+            }
+            CompressionCodec::Zstd => {
+                let mut decoder = zstd::Decoder::new(input)?;
+                match decoder.read_to_end(output) {
+                    Ok(size) => Ok(size),
+                    Err(e) => Err(e.into()),
+                }
+            }
+        };
+        result
+    }
+}
+
+/// Get the uncompressed length
+/// Notes:
+///   LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed
+///    0: indicate that there is no data
+///   positive number: indicate the uncompressed length for the following data
+#[inline]
+fn read_uncompressed_size(buffer: &[u8]) -> i64 {
+    let len_buffer = &buffer[0..8];
+    // 64-bit little-endian signed integer
+    i64::from_le_bytes(len_buffer.try_into().unwrap())
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_lz4_compression() {
+        let input_bytes = "hello lz4".as_bytes();
+        let codec: CompressionCodec = CompressionCodec::Lz4Frame;
+        let mut output_bytes: Vec<u8> = Vec::new();
+        codec.compress(input_bytes, &mut output_bytes).unwrap();
+        let mut result_output_bytes: Vec<u8> = Vec::new();
+        codec
+            .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+            .unwrap();
+        assert_eq!(input_bytes, result_output_bytes.as_slice());
+    }
+
+    #[test]
+    fn test_zstd_compression() {
+        let input_bytes = "hello zstd".as_bytes();
+        let codec: CompressionCodec = CompressionCodec::Zstd;
+        let mut output_bytes: Vec<u8> = Vec::new();
+        codec.compress(input_bytes, &mut output_bytes).unwrap();
+        let mut result_output_bytes: Vec<u8> = Vec::new();
+        codec
+            .decompress(output_bytes.as_slice(), &mut result_output_bytes)
+            .unwrap();
+        assert_eq!(input_bytes, result_output_bytes.as_slice());
+    }
+}
diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/compression/mod.rs
similarity index 54%
copy from arrow/src/ipc/mod.rs
copy to arrow/src/ipc/compression/mod.rs
index d5455b454..666fa6d86 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow/src/ipc/compression/mod.rs
@@ -15,26 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// TODO: (vcq): Protobuf codegen is not generating Debug impls.
-#![allow(missing_debug_implementations)]
+#[cfg(feature = "ipc_compression")]
+mod codec;
+#[cfg(feature = "ipc_compression")]
+pub(crate) use codec::CompressionCodec;
 
-pub mod convert;
-pub mod reader;
-pub mod writer;
-
-#[allow(clippy::redundant_closure)]
-#[allow(clippy::needless_lifetimes)]
-#[allow(clippy::extra_unused_lifetimes)]
-#[allow(clippy::redundant_static_lifetimes)]
-#[allow(clippy::redundant_field_names)]
-#[allow(non_camel_case_types)]
-pub mod gen;
-
-pub use self::gen::File::*;
-pub use self::gen::Message::*;
-pub use self::gen::Schema::*;
-pub use self::gen::SparseTensor::*;
-pub use self::gen::Tensor::*;
-
-const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
-const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
+#[cfg(not(feature = "ipc_compression"))]
+mod stub;
+#[cfg(not(feature = "ipc_compression"))]
+pub(crate) use stub::CompressionCodec;
diff --git a/arrow/src/ipc/compression/stub.rs b/arrow/src/ipc/compression/stub.rs
new file mode 100644
index 000000000..6240f084b
--- /dev/null
+++ b/arrow/src/ipc/compression/stub.rs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Stubs that implement the same interface as the ipc_compression
+//! codec module, but always errors.
+
+use crate::buffer::Buffer;
+use crate::error::{ArrowError, Result};
+use crate::ipc::CompressionType;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum CompressionCodec {}
+
+impl TryFrom<CompressionCodec> for CompressionType {
+    type Error = ArrowError;
+    fn try_from(codec: CompressionCodec) -> Result<Self> {
+        Err(ArrowError::InvalidArgumentError(
+            format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)))
+    }
+}
+
+impl TryFrom<CompressionType> for CompressionCodec {
+    type Error = ArrowError;
+
+    fn try_from(compression_type: CompressionType) -> Result<Self> {
+        Err(ArrowError::InvalidArgumentError(
+            format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
+            )
+    }
+}
+
+impl CompressionCodec {
+    #[allow(clippy::ptr_arg)]
+    pub(crate) fn compress_to_vec(
+        &self,
+        _input: &[u8],
+        _output: &mut Vec<u8>,
+    ) -> Result<usize> {
+        Err(ArrowError::InvalidArgumentError(
+            "compression not supported because arrow was not compiled with the ipc_compression feature".to_string()
+        ))
+    }
+
+    pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result<Buffer> {
+        Err(ArrowError::InvalidArgumentError(
+            "decompression not supported because arrow was not compiled with the ipc_compression feature".to_string()
+        ))
+    }
+}
diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs
index d5455b454..2b30e7220 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow/src/ipc/mod.rs
@@ -22,6 +22,8 @@ pub mod convert;
 pub mod reader;
 pub mod writer;
 
+mod compression;
+
 #[allow(clippy::redundant_closure)]
 #[allow(clippy::needless_lifetimes)]
 #[allow(clippy::extra_unused_lifetimes)]
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index ce44d74a1..393128371 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -21,6 +21,7 @@
 //! however the `FileReader` expects a reader that supports `Seek`ing
 
 use std::collections::HashMap;
+use std::fmt;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
@@ -32,15 +33,35 @@ use crate::error::{ArrowError, Result};
 use crate::ipc;
 use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
+use crate::ipc::compression::CompressionCodec;
 use ipc::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
-fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
+fn read_buffer(
+    buf: &ipc::Buffer,
+    a_data: &[u8],
+    compression_codec: &Option<CompressionCodec>,
+) -> Result<Buffer> {
     let start_offset = buf.offset() as usize;
     let end_offset = start_offset + buf.length() as usize;
     let buf_data = &a_data[start_offset..end_offset];
-    Buffer::from(&buf_data)
+    // corner case: empty buffer
+    if buf_data.is_empty() {
+        return Ok(Buffer::from(buf_data));
+    }
+    match compression_codec {
+        Some(decompressor) => decompressor.decompress_to_buffer(buf_data),
+        None => Ok(Buffer::from(buf_data)),
+    }
 }
 
 /// Coordinates reading arrays based on data types.
@@ -61,6 +82,7 @@ fn create_array(
     dictionaries_by_id: &HashMap<i64, ArrayRef>,
     mut node_index: usize,
     mut buffer_index: usize,
+    compression_codec: &Option<CompressionCodec>,
     metadata: &ipc::MetadataVersion,
 ) -> Result<(ArrayRef, usize, usize)> {
     use DataType::*;
@@ -72,8 +94,8 @@ fn create_array(
                 data_type,
                 buffers[buffer_index..buffer_index + 3]
                     .iter()
-                    .map(|buf| read_buffer(buf, data))
-                    .collect(),
+                    .map(|buf| read_buffer(buf, data, compression_codec))
+                    .collect::<Result<_>>()?,
             );
             node_index += 1;
             buffer_index += 3;
@@ -85,8 +107,8 @@ fn create_array(
                 data_type,
                 buffers[buffer_index..buffer_index + 2]
                     .iter()
-                    .map(|buf| read_buffer(buf, data))
-                    .collect(),
+                    .map(|buf| read_buffer(buf, data, compression_codec))
+                    .collect::<Result<_>>()?,
             );
             node_index += 1;
             buffer_index += 2;
@@ -96,8 +118,8 @@ fn create_array(
             let list_node = &nodes[node_index];
             let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
                 .iter()
-                .map(|buf| read_buffer(buf, data))
-                .collect();
+                .map(|buf| read_buffer(buf, data, compression_codec))
+                .collect::<Result<_>>()?;
             node_index += 1;
             buffer_index += 2;
             let triple = create_array(
@@ -108,6 +130,7 @@ fn create_array(
                 dictionaries_by_id,
                 node_index,
                 buffer_index,
+                compression_codec,
                 metadata,
             )?;
             node_index = triple.1;
@@ -119,8 +142,8 @@ fn create_array(
             let list_node = &nodes[node_index];
             let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
                 .iter()
-                .map(|buf| read_buffer(buf, data))
-                .collect();
+                .map(|buf| read_buffer(buf, data, compression_codec))
+                .collect::<Result<_>>()?;
             node_index += 1;
             buffer_index += 1;
             let triple = create_array(
@@ -131,6 +154,7 @@ fn create_array(
                 dictionaries_by_id,
                 node_index,
                 buffer_index,
+                compression_codec,
                 metadata,
             )?;
             node_index = triple.1;
@@ -140,7 +164,8 @@ fn create_array(
         }
         Struct(struct_fields) => {
             let struct_node = &nodes[node_index];
-            let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
+            let null_buffer: Buffer =
+                read_buffer(&buffers[buffer_index], data, compression_codec)?;
             node_index += 1;
             buffer_index += 1;
 
@@ -157,6 +182,7 @@ fn create_array(
                     dictionaries_by_id,
                     node_index,
                     buffer_index,
+                    compression_codec,
                     metadata,
                 )?;
                 node_index = triple.1;
@@ -177,8 +203,8 @@ fn create_array(
             let index_node = &nodes[node_index];
             let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
                 .iter()
-                .map(|buf| read_buffer(buf, data))
-                .collect();
+                .map(|buf| read_buffer(buf, data, compression_codec))
+                .collect::<Result<_>>()?;
 
             let dict_id = field.dict_id().ok_or_else(|| {
                 ArrowError::IoError(format!("Field {} does not have dict id", field))
@@ -209,18 +235,20 @@ fn create_array(
             // In V4, union types has validity bitmap
             // In V5 and later, union types have no validity bitmap
             if metadata < &ipc::MetadataVersion::V5 {
-                read_buffer(&buffers[buffer_index], data);
+                read_buffer(&buffers[buffer_index], data, compression_codec)?;
                 buffer_index += 1;
             }
 
             let type_ids: Buffer =
-                read_buffer(&buffers[buffer_index], data)[..len].into();
+                read_buffer(&buffers[buffer_index], data, compression_codec)?[..len]
+                    .into();
 
             buffer_index += 1;
 
             let value_offsets = match mode {
                 UnionMode::Dense => {
-                    let buffer = read_buffer(&buffers[buffer_index], data);
+                    let buffer =
+                        read_buffer(&buffers[buffer_index], data, compression_codec)?;
                     buffer_index += 1;
                     Some(buffer[..len * 4].into())
                 }
@@ -238,6 +266,7 @@ fn create_array(
                     dictionaries_by_id,
                     node_index,
                     buffer_index,
+                    compression_codec,
                     metadata,
                 )?;
 
@@ -277,8 +306,8 @@ fn create_array(
                 data_type,
                 buffers[buffer_index..buffer_index + 2]
                     .iter()
-                    .map(|buf| read_buffer(buf, data))
-                    .collect(),
+                    .map(|buf| read_buffer(buf, data, compression_codec))
+                    .collect::<Result<_>>()?,
             );
             node_index += 1;
             buffer_index += 2;
@@ -603,6 +632,11 @@ pub fn read_record_batch(
     let field_nodes = batch.nodes().ok_or_else(|| {
         ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
     })?;
+    let batch_compression = batch.compression();
+    let compression_codec: Option<CompressionCodec> = batch_compression
+        .map(|batch_compression| batch_compression.codec().try_into())
+        .transpose()?;
+
     // keep track of buffer and node index, the functions that create arrays mutate these
     let mut buffer_index = 0;
     let mut node_index = 0;
@@ -626,6 +660,7 @@ pub fn read_record_batch(
                     dictionaries_by_id,
                     node_index,
                     buffer_index,
+                    &compression_codec,
                     metadata,
                 )?;
                 node_index = triple.1;
@@ -664,6 +699,7 @@ pub fn read_record_batch(
                 dictionaries_by_id,
                 node_index,
                 buffer_index,
+                &compression_codec,
                 metadata,
             )?;
             node_index = triple.1;
@@ -761,6 +797,21 @@ pub struct FileReader<R: Read + Seek> {
     projection: Option<(Vec<usize>, Schema)>,
 }
 
+impl<R: Read + Seek> fmt::Debug for FileReader<R> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
+        f.debug_struct("FileReader<R>")
+            .field("reader", &"BufReader<..>")
+            .field("schema", &self.schema)
+            .field("blocks", &self.blocks)
+            .field("current_block", &self.current_block)
+            .field("total_blocks", &self.total_blocks)
+            .field("dictionaries_by_id", &self.dictionaries_by_id)
+            .field("metadata_version", &self.metadata_version)
+            .field("projection", &self.projection)
+            .finish()
+    }
+}
+
 impl<R: Read + Seek> FileReader<R> {
     /// Try to create a new file reader
     ///
@@ -921,7 +972,6 @@ impl<R: Read + Seek> FileReader<R> {
 
         let mut block_data = vec![0; meta_len as usize];
         self.reader.read_exact(&mut block_data)?;
-
         let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
             ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
         })?;
@@ -1013,6 +1063,18 @@ pub struct StreamReader<R: Read> {
     projection: Option<(Vec<usize>, Schema)>,
 }
 
+impl<R: Read> fmt::Debug for StreamReader<R> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
+        f.debug_struct("StreamReader<R>")
+            .field("reader", &"BufReader<..>")
+            .field("schema", &self.schema)
+            .field("dictionaries_by_id", &self.dictionaries_by_id)
+            .field("finished", &self.finished)
+            .field("projection", &self.projection)
+            .finish()
+    }
+}
+
 impl<R: Read> StreamReader<R> {
     /// Try to create a new stream reader
     ///
@@ -1414,6 +1476,105 @@ mod tests {
         });
     }
 
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn read_generated_streams_200() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec!["generated_lz4", "generated_zstd"];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.stream",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = StreamReader::try_new(file, None).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(version, path);
+            assert!(arrow_json.equals_reader(&mut reader).unwrap());
+            // the next batch must be empty
+            assert!(reader.next().is_none());
+            // the stream must indicate that it's finished
+            assert!(reader.is_finished());
+        });
+    }
+
+    #[test]
+    #[cfg(not(feature = "ipc_compression"))]
+    fn read_generated_streams_200_negative() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+
+        // the test is repetitive, thus we can read all supported files at once
+        let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")];
+        cases.iter().for_each(|(path, compression_name)| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.stream",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = StreamReader::try_new(file, None).unwrap();
+            let err = reader.next().unwrap().unwrap_err();
+            let expected_error = format!(
+                "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature",
+                compression_name
+            );
+            assert_eq!(err.to_string(), expected_error);
+        });
+    }
+
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn read_generated_files_200() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec!["generated_lz4", "generated_zstd"];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = FileReader::try_new(file, None).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(version, path);
+            assert!(arrow_json.equals_reader(&mut reader).unwrap());
+        });
+    }
+
+    #[test]
+    #[cfg(not(feature = "ipc_compression"))]
+    fn read_generated_files_200_negative() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+        // the test is repetitive, thus we can read all supported files at once
+        let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")];
+        cases.iter().for_each(|(path, compression_name)| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = FileReader::try_new(file, None).unwrap();
+
+            let err = reader.next().unwrap().unwrap_err();
+            let expected_error = format!(
+                "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature",
+                compression_name
+            );
+            assert_eq!(err.to_string(), expected_error);
+        });
+    }
+
     fn create_test_projection_schema() -> Schema {
         // define field types
         let list_data_type =
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index f0942b074..c817cb77c 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -39,6 +39,7 @@ use crate::ipc;
 use crate::record_batch::RecordBatch;
 use crate::util::bit_util;
 
+use crate::ipc::compression::CompressionCodec;
 use ipc::CONTINUATION_MARKER;
 
 /// IPC write options used to control the behaviour of the writer
@@ -58,9 +59,30 @@ pub struct IpcWriteOptions {
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
     metadata_version: ipc::MetadataVersion,
+    /// Compression, if desired. Only supported when `ipc_compression`
+    /// feature is enabled
+    batch_compression_type: Option<ipc::CompressionType>,
 }
 
 impl IpcWriteOptions {
+    /// Configures compression when writing IPC files. Requires the
+    /// `ipc_compression` feature of the crate to be activated.
+    #[cfg(feature = "ipc_compression")]
+    pub fn try_with_compression(
+        mut self,
+        batch_compression_type: Option<ipc::CompressionType>,
+    ) -> Result<Self> {
+        self.batch_compression_type = batch_compression_type;
+
+        if self.batch_compression_type.is_some()
+            && self.metadata_version < ipc::MetadataVersion::V5
+        {
+            return Err(ArrowError::InvalidArgumentError(
+                "Compression only supported in metadata v5 and above".to_string(),
+            ));
+        }
+        Ok(self)
+    }
     /// Try create IpcWriteOptions, checking for incompatible settings
     pub fn try_new(
         alignment: usize,
@@ -82,6 +104,7 @@ impl IpcWriteOptions {
                 alignment,
                 write_legacy_ipc_format,
                 metadata_version,
+                batch_compression_type: None,
             }),
             ipc::MetadataVersion::V5 => {
                 if write_legacy_ipc_format {
@@ -94,10 +117,14 @@ impl IpcWriteOptions {
                         alignment,
                         write_legacy_ipc_format,
                         metadata_version,
+                        batch_compression_type: None,
                     })
                 }
             }
-            z => panic!("Unsupported ipc::MetadataVersion {:?}", z),
+            z => Err(ArrowError::InvalidArgumentError(format!(
+                "Unsupported ipc::MetadataVersion {:?}",
+                z
+            ))),
         }
     }
 }
@@ -108,6 +135,7 @@ impl Default for IpcWriteOptions {
             alignment: 8,
             write_legacy_ipc_format: false,
             metadata_version: ipc::MetadataVersion::V5,
+            batch_compression_type: None,
         }
     }
 }
@@ -278,7 +306,7 @@ impl IpcDataGenerator {
                         dict_id,
                         dict_values,
                         write_options,
-                    ));
+                    )?);
                 }
             }
             _ => self._encode_dictionaries(
@@ -312,7 +340,7 @@ impl IpcDataGenerator {
             )?;
         }
 
-        let encoded_message = self.record_batch_to_bytes(batch, write_options);
+        let encoded_message = self.record_batch_to_bytes(batch, write_options)?;
         Ok((encoded_dictionaries, encoded_message))
     }
 
@@ -322,13 +350,27 @@ impl IpcDataGenerator {
         &self,
         batch: &RecordBatch,
         write_options: &IpcWriteOptions,
-    ) -> EncodedData {
+    ) -> Result<EncodedData> {
         let mut fbb = FlatBufferBuilder::new();
 
         let mut nodes: Vec<ipc::FieldNode> = vec![];
         let mut buffers: Vec<ipc::Buffer> = vec![];
         let mut arrow_data: Vec<u8> = vec![];
         let mut offset = 0;
+
+        // get the type of compression
+        let batch_compression_type = write_options.batch_compression_type;
+
+        let compression = batch_compression_type.map(|batch_compression_type| {
+            let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
+            c.add_method(ipc::BodyCompressionMethod::BUFFER);
+            c.add_codec(batch_compression_type);
+            c.finish()
+        });
+
+        let compression_codec: Option<CompressionCodec> =
+            batch_compression_type.map(TryInto::try_into).transpose()?;
+
         for array in batch.columns() {
             let array_data = array.data();
             offset = write_array_data(
@@ -339,19 +381,26 @@ impl IpcDataGenerator {
                 offset,
                 array.len(),
                 array.null_count(),
+                &compression_codec,
                 write_options,
-            );
+            )?;
         }
+        // pad the tail of body data
+        let len = arrow_data.len();
+        let pad_len = pad_to_8(len as u32);
+        arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
 
         // write data
         let buffers = fbb.create_vector(&buffers);
         let nodes = fbb.create_vector(&nodes);
-
         let root = {
             let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
             batch_builder.add_length(batch.num_rows() as i64);
             batch_builder.add_nodes(nodes);
             batch_builder.add_buffers(buffers);
+            if let Some(c) = compression {
+                batch_builder.add_compression(c);
+            }
             let b = batch_builder.finish();
             b.as_union_value()
         };
@@ -365,10 +414,10 @@ impl IpcDataGenerator {
         fbb.finish(root, None);
         let finished_data = fbb.finished_data();
 
-        EncodedData {
+        Ok(EncodedData {
             ipc_message: finished_data.to_vec(),
             arrow_data,
-        }
+        })
     }
 
     /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the
@@ -378,13 +427,27 @@ impl IpcDataGenerator {
         dict_id: i64,
         array_data: &ArrayData,
         write_options: &IpcWriteOptions,
-    ) -> EncodedData {
+    ) -> Result<EncodedData> {
         let mut fbb = FlatBufferBuilder::new();
 
         let mut nodes: Vec<ipc::FieldNode> = vec![];
         let mut buffers: Vec<ipc::Buffer> = vec![];
         let mut arrow_data: Vec<u8> = vec![];
 
+        // get the type of compression
+        let batch_compression_type = write_options.batch_compression_type;
+
+        let compression = batch_compression_type.map(|batch_compression_type| {
+            let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
+            c.add_method(ipc::BodyCompressionMethod::BUFFER);
+            c.add_codec(batch_compression_type);
+            c.finish()
+        });
+
+        let compression_codec: Option<CompressionCodec> = batch_compression_type
+            .map(|batch_compression_type| batch_compression_type.try_into())
+            .transpose()?;
+
         write_array_data(
             array_data,
             &mut buffers,
@@ -393,8 +456,14 @@ impl IpcDataGenerator {
             0,
             array_data.len(),
             array_data.null_count(),
+            &compression_codec,
             write_options,
-        );
+        )?;
+
+        // pad the tail of body data
+        let len = arrow_data.len();
+        let pad_len = pad_to_8(len as u32);
+        arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
 
         // write data
         let buffers = fbb.create_vector(&buffers);
@@ -405,6 +474,9 @@ impl IpcDataGenerator {
             batch_builder.add_length(array_data.len() as i64);
             batch_builder.add_nodes(nodes);
             batch_builder.add_buffers(buffers);
+            if let Some(c) = compression {
+                batch_builder.add_compression(c);
+            }
             batch_builder.finish()
         };
 
@@ -427,10 +499,10 @@ impl IpcDataGenerator {
         fbb.finish(root, None);
         let finished_data = fbb.finished_data();
 
-        EncodedData {
+        Ok(EncodedData {
             ipc_message: finished_data.to_vec(),
             arrow_data,
-        }
+        })
     }
 }
 
@@ -519,9 +591,10 @@ impl<W: Write> FileWriter<W> {
     ) -> Result<Self> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
-        // write magic to header
+        // write magic to header aligned on 8 byte boundary
+        let header_size = super::ARROW_MAGIC.len() + 2;
+        assert_eq!(header_size, 8);
         writer.write_all(&super::ARROW_MAGIC[..])?;
-        // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
         // write the schema, set the written bytes to the schema + header
         let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
@@ -530,7 +603,7 @@ impl<W: Write> FileWriter<W> {
             writer,
             write_options,
             schema: schema.clone(),
-            block_offsets: meta + data + 8,
+            block_offsets: meta + data + header_size,
             dictionary_blocks: vec![],
             record_blocks: vec![],
             finished: false,
@@ -984,8 +1057,9 @@ fn write_array_data(
     offset: i64,
     num_rows: usize,
     null_count: usize,
+    compression_codec: &Option<CompressionCodec>,
     write_options: &IpcWriteOptions,
-) -> i64 {
+) -> Result<i64> {
     let mut offset = offset;
     if !matches!(array_data.data_type(), DataType::Null) {
         nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
@@ -1007,7 +1081,13 @@ fn write_array_data(
             Some(buffer) => buffer.bit_slice(array_data.offset(), array_data.len()),
         };
 
-        offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data, offset);
+        offset = write_buffer(
+            null_buffer.as_slice(),
+            buffers,
+            arrow_data,
+            offset,
+            compression_codec,
+        )?;
     }
 
     let data_type = array_data.data_type();
@@ -1040,18 +1120,36 @@ fn write_array_data(
                     )
                 };
 
-            offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset);
+            offset = write_buffer(
+                new_offsets.as_slice(),
+                buffers,
+                arrow_data,
+                offset,
+                compression_codec,
+            )?;
 
             let total_bytes = get_binary_buffer_len(array_data);
             let value_buffer = &array_data.buffers()[1];
             let buffer_length = min(total_bytes, value_buffer.len() - byte_offset);
             let buffer_slice =
                 &value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
-            offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+            offset = write_buffer(
+                buffer_slice,
+                buffers,
+                arrow_data,
+                offset,
+                compression_codec,
+            )?;
         } else {
-            array_data.buffers().iter().for_each(|buffer| {
-                offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset);
-            });
+            for buffer in array_data.buffers() {
+                offset = write_buffer(
+                    buffer.as_slice(),
+                    buffers,
+                    arrow_data,
+                    offset,
+                    compression_codec,
+                )?;
+            }
         }
     } else if DataType::is_numeric(data_type)
         || DataType::is_temporal(data_type)
@@ -1074,19 +1172,32 @@ fn write_array_data(
             let buffer_length = min(min_length, buffer.len() - byte_offset);
             let buffer_slice =
                 &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)];
-            offset = write_buffer(buffer_slice, buffers, arrow_data, offset);
+            offset = write_buffer(
+                buffer_slice,
+                buffers,
+                arrow_data,
+                offset,
+                compression_codec,
+            )?;
         } else {
-            offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset);
+            offset = write_buffer(
+                buffer.as_slice(),
+                buffers,
+                arrow_data,
+                offset,
+                compression_codec,
+            )?;
         }
     } else {
-        array_data.buffers().iter().for_each(|buffer| {
-            offset = write_buffer(buffer, buffers, arrow_data, offset);
-        });
+        for buffer in array_data.buffers() {
+            offset =
+                write_buffer(buffer, buffers, arrow_data, offset, compression_codec)?;
+        }
     }
 
     if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) {
         // recursively write out nested structures
-        array_data.child_data().iter().for_each(|data_ref| {
+        for data_ref in array_data.child_data() {
             // write the nested data (e.g list data)
             offset = write_array_data(
                 data_ref,
@@ -1096,29 +1207,56 @@ fn write_array_data(
                 offset,
                 data_ref.len(),
                 data_ref.null_count(),
+                compression_codec,
                 write_options,
-            );
-        });
+            )?;
+        }
     }
 
-    offset
+    Ok(offset)
 }
 
-/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector
+/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
+/// [`ipc::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
+///
+///
+/// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
+/// Each constituent buffer is first compressed with the indicated
+/// compressor, and then written with the uncompressed length in the first 8
+/// bytes as a 64-bit little-endian signed integer followed by the compressed
+/// buffer bytes (and then padding as required by the protocol). The
+/// uncompressed length may be set to -1 to indicate that the data that
+/// follows is not compressed, which can be useful for cases where
+/// compression does not yield appreciable savings.
 fn write_buffer(
-    buffer: &[u8],
-    buffers: &mut Vec<ipc::Buffer>,
-    arrow_data: &mut Vec<u8>,
-    offset: i64,
-) -> i64 {
-    let len = buffer.len();
-    let pad_len = pad_to_8(len as u32);
-    let total_len: i64 = (len + pad_len) as i64;
-    // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes");
-    buffers.push(ipc::Buffer::new(offset, total_len));
-    arrow_data.extend_from_slice(buffer);
-    arrow_data.extend_from_slice(&vec![0u8; pad_len][..]);
-    offset + total_len
+    buffer: &[u8],                  // input
+    buffers: &mut Vec<ipc::Buffer>, // output buffer descriptors
+    arrow_data: &mut Vec<u8>,       // output stream
+    offset: i64,                    // current output stream offset
+    compression_codec: &Option<CompressionCodec>,
+) -> Result<i64> {
+    let len: i64 = match compression_codec {
+        Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
+        None => {
+            arrow_data.extend_from_slice(buffer);
+            buffer.len()
+        }
+    }
+    .try_into()
+    .map_err(|e| {
+        ArrowError::InvalidArgumentError(format!(
+            "Could not convert compressed size to i64: {}",
+            e
+        ))
+    })?;
+
+    // make new index entry
+    buffers.push(ipc::Buffer::new(offset, len));
+    // padding and make offset 8 bytes aligned
+    let pad_len = pad_to_8(len as u32) as i64;
+    arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]);
+
+    Ok(offset + len + pad_len)
 }
 
 /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes
@@ -1143,6 +1281,167 @@ mod tests {
     use crate::ipc::reader::*;
     use crate::util::integration_util::*;
 
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn test_write_empty_record_batch_lz4_compression() {
+        let file_name = "arrow_lz4_empty";
+        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+        let values: Vec<Option<i32>> = vec![];
+        let array = Int32Array::from(values);
+        let record_batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+                .unwrap();
+        {
+            let file =
+                File::create(format!("target/debug/testdata/{}.arrow_file", file_name))
+                    .unwrap();
+            let write_option =
+                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                    .unwrap()
+                    .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+                    .unwrap();
+
+            let mut writer =
+                FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+            writer.write(&record_batch).unwrap();
+            writer.finish().unwrap();
+        }
+        {
+            // read file
+            let file =
+                File::open(format!("target/debug/testdata/{}.arrow_file", file_name))
+                    .unwrap();
+            let mut reader = FileReader::try_new(file, None).unwrap();
+            loop {
+                match reader.next() {
+                    Some(Ok(read_batch)) => {
+                        read_batch
+                            .columns()
+                            .iter()
+                            .zip(record_batch.columns())
+                            .for_each(|(a, b)| {
+                                assert_eq!(a.data_type(), b.data_type());
+                                assert_eq!(a.len(), b.len());
+                                assert_eq!(a.null_count(), b.null_count());
+                            });
+                    }
+                    Some(Err(e)) => {
+                        panic!("{}", e);
+                    }
+                    None => {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn test_write_file_with_lz4_compression() {
+        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
+        let array = Int32Array::from(values);
+        let record_batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+                .unwrap();
+        {
+            let file =
+                File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap();
+            let write_option =
+                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                    .unwrap()
+                    .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+                    .unwrap();
+
+            let mut writer =
+                FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+            writer.write(&record_batch).unwrap();
+            writer.finish().unwrap();
+        }
+        {
+            // read file
+            let file =
+                File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4"))
+                    .unwrap();
+            let mut reader = FileReader::try_new(file, None).unwrap();
+            loop {
+                match reader.next() {
+                    Some(Ok(read_batch)) => {
+                        read_batch
+                            .columns()
+                            .iter()
+                            .zip(record_batch.columns())
+                            .for_each(|(a, b)| {
+                                assert_eq!(a.data_type(), b.data_type());
+                                assert_eq!(a.len(), b.len());
+                                assert_eq!(a.null_count(), b.null_count());
+                            });
+                    }
+                    Some(Err(e)) => {
+                        panic!("{}", e);
+                    }
+                    None => {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn test_write_file_with_zstd_compression() {
+        let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
+        let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
+        let array = Int32Array::from(values);
+        let record_batch =
+            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)])
+                .unwrap();
+        {
+            let file =
+                File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap();
+            let write_option =
+                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                    .unwrap()
+                    .try_with_compression(Some(ipc::CompressionType::ZSTD))
+                    .unwrap();
+
+            let mut writer =
+                FileWriter::try_new_with_options(file, &schema, write_option).unwrap();
+            writer.write(&record_batch).unwrap();
+            writer.finish().unwrap();
+        }
+        {
+            // read file
+            let file =
+                File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_zstd"))
+                    .unwrap();
+            let mut reader = FileReader::try_new(file, None).unwrap();
+            loop {
+                match reader.next() {
+                    Some(Ok(read_batch)) => {
+                        read_batch
+                            .columns()
+                            .iter()
+                            .zip(record_batch.columns())
+                            .for_each(|(a, b)| {
+                                assert_eq!(a.data_type(), b.data_type());
+                                assert_eq!(a.len(), b.len());
+                                assert_eq!(a.null_count(), b.null_count());
+                            });
+                    }
+                    Some(Err(e)) => {
+                        panic!("{}", e);
+                    }
+                    None => {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
     #[test]
     fn test_write_file() {
         let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
@@ -1499,6 +1798,107 @@ mod tests {
         });
     }
 
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn read_and_rewrite_compression_files_200() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec!["generated_lz4", "generated_zstd"];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let mut reader = FileReader::try_new(file, None).unwrap();
+
+            // read and rewrite the file to a temp location
+            {
+                let file = File::create(format!(
+                    "target/debug/testdata/{}-{}.arrow_file",
+                    version, path
+                ))
+                .unwrap();
+                // write IPC version 5
+                let options =
+                    IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                        .unwrap()
+                        .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+                        .unwrap();
+
+                let mut writer =
+                    FileWriter::try_new_with_options(file, &reader.schema(), options)
+                        .unwrap();
+                while let Some(Ok(batch)) = reader.next() {
+                    writer.write(&batch).unwrap();
+                }
+                writer.finish().unwrap();
+            }
+
+            let file = File::open(format!(
+                "target/debug/testdata/{}-{}.arrow_file",
+                version, path
+            ))
+            .unwrap();
+            let mut reader = FileReader::try_new(file, None).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(version, path);
+            assert!(arrow_json.equals_reader(&mut reader).unwrap());
+        });
+    }
+
+    #[test]
+    #[cfg(feature = "ipc_compression")]
+    fn read_and_rewrite_compression_stream_200() {
+        let testdata = crate::util::test_util::arrow_test_data();
+        let version = "2.0.0-compression";
+        // the test is repetitive, thus we can read all supported files at once
+        let paths = vec!["generated_lz4", "generated_zstd"];
+        paths.iter().for_each(|path| {
+            let file = File::open(format!(
+                "{}/arrow-ipc-stream/integration/{}/{}.stream",
+                testdata, version, path
+            ))
+            .unwrap();
+
+            let reader = StreamReader::try_new(file, None).unwrap();
+
+            // read and rewrite the stream to a temp location
+            {
+                let file = File::create(format!(
+                    "target/debug/testdata/{}-{}.stream",
+                    version, path
+                ))
+                .unwrap();
+                let options =
+                    IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                        .unwrap()
+                        .try_with_compression(Some(ipc::CompressionType::ZSTD))
+                        .unwrap();
+
+                let mut writer =
+                    StreamWriter::try_new_with_options(file, &reader.schema(), options)
+                        .unwrap();
+                reader.for_each(|batch| {
+                    writer.write(&batch.unwrap()).unwrap();
+                });
+                writer.finish().unwrap();
+            }
+
+            let file =
+                File::open(format!("target/debug/testdata/{}-{}.stream", version, path))
+                    .unwrap();
+            let mut reader = StreamReader::try_new(file, None).unwrap();
+
+            // read expected JSON output
+            let arrow_json = read_gzip_json(version, path);
+            assert!(arrow_json.equals_reader(&mut reader).unwrap());
+        });
+    }
+
     /// Read gzipped JSON file
     fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
         let testdata = crate::util::test_util::arrow_test_data();