You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/06 08:31:27 UTC

[arrow-rs] branch master updated: Split out arrow-ipc (#3022)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new deb64554f Split out arrow-ipc (#3022)
deb64554f is described below

commit deb64554f9d25afa044248293b31ab7c26f0e42f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Sun Nov 6 21:31:22 2022 +1300

    Split out arrow-ipc (#3022)
    
    * Split out arrow-ipc
    
    * RAT
    
    * Fix doc
    
    * Tweak required-features
    
    * Clippy
    
    * Fix feature flags
---
 .github/workflows/arrow.yml                        |   5 +
 .github/workflows/arrow_flight.yml                 |   1 +
 .github/workflows/dev_pr/labeler.yml               |   1 +
 .github/workflows/integration.yml                  |   1 +
 .github/workflows/miri.yaml                        |   1 +
 .github/workflows/parquet.yml                      |   1 +
 Cargo.toml                                         |   1 +
 arrow-array/src/lib.rs                             |   2 +-
 arrow-array/src/record_batch.rs                    |  18 ++
 arrow-ipc/CONTRIBUTING.md                          |  37 +++
 arrow-ipc/Cargo.toml                               |  51 ++++
 {arrow => arrow-ipc}/regen.sh                      |   0
 .../codec.rs => arrow-ipc/src/compression.rs       | 133 ++++++---
 {arrow/src/ipc => arrow-ipc/src}/convert.rs        | 294 +++++++++----------
 {arrow/src/ipc => arrow-ipc/src}/gen/File.rs       |   2 +-
 {arrow/src/ipc => arrow-ipc/src}/gen/Message.rs    |   8 +-
 {arrow/src/ipc => arrow-ipc/src}/gen/Schema.rs     |   0
 .../src/ipc => arrow-ipc/src}/gen/SparseTensor.rs  |   4 +-
 {arrow/src/ipc => arrow-ipc/src}/gen/Tensor.rs     |   2 +-
 {arrow/src/ipc => arrow-ipc/src}/gen/mod.rs        |   0
 arrow/src/ipc/mod.rs => arrow-ipc/src/lib.rs       |   2 +
 {arrow/src/ipc => arrow-ipc/src}/reader.rs         | 160 ++++++-----
 {arrow/src/ipc => arrow-ipc/src}/writer.rs         | 310 +++++++++------------
 arrow/CONTRIBUTING.md                              |  17 --
 arrow/Cargo.toml                                   |  12 +-
 arrow/src/ipc/compression/mod.rs                   |  26 --
 arrow/src/ipc/compression/stub.rs                  |  63 -----
 arrow/src/lib.rs                                   |  25 +-
 arrow/tests/ipc_integration.rs                     |  61 ++++
 dev/release/README.md                              |   2 +
 30 files changed, 648 insertions(+), 592 deletions(-)

diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index 9ae72dd00..d930086ef 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -33,6 +33,7 @@ on:
       - arrow-schema/**
       - arrow-select/**
       - arrow-integration-test/**
+      - arrow-ipc/**
       - .github/**
 
 jobs:
@@ -61,6 +62,8 @@ jobs:
         run: cargo test -p arrow-select --all-features
       - name: Test arrow-cast with all features
         run: cargo test -p arrow-cast --all-features
+      - name: Test arrow-ipc with all features
+        run: cargo test -p arrow-ipc --all-features
       - name: Test arrow-integration-test with all features
         run: cargo test -p arrow-integration-test --all-features
       - name: Test arrow with default features
@@ -169,5 +172,7 @@ jobs:
         run: cargo clippy -p arrow-select --all-targets --all-features -- -D warnings
       - name: Clippy arrow-cast with all features
         run: cargo clippy -p arrow-cast --all-targets --all-features -- -D warnings
+      - name: Clippy arrow-ipc with all features
+        run: cargo clippy -p arrow-ipc --all-targets --all-features -- -D warnings
       - name: Clippy arrow
         run: cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression,dyn_cmp_dict,dyn_arith_dict,chrono-tz --all-targets -- -D warnings
diff --git a/.github/workflows/arrow_flight.yml b/.github/workflows/arrow_flight.yml
index 9621c9e69..ded4f5a67 100644
--- a/.github/workflows/arrow_flight.yml
+++ b/.github/workflows/arrow_flight.yml
@@ -35,6 +35,7 @@ on:
       - arrow-schema/**
       - arrow-select/**
       - arrow-flight/**
+      - arrow-ipc/**
       - .github/**
 
 jobs:
diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml
index 3a0073004..17ebf54de 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -23,6 +23,7 @@ arrow:
   - arrow-data/**/*
   - arrow-schema/**/*
   - arrow-select/**/*
+  - arrow-ipc/**/*
 
 arrow-flight:
   - arrow-flight/**/*
diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
index c2c0a79e6..8566230ea 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -31,6 +31,7 @@ on:
       - arrow-data/**
       - arrow-schema/**
       - arrow-select/**
+      - arrow-ipc/**
       - arrow-pyarrow-integration-testing/**
       - arrow-integration-test/**
       - arrow-integration-testing/**
diff --git a/.github/workflows/miri.yaml b/.github/workflows/miri.yaml
index 241b4f0b4..2e85c9dd9 100644
--- a/.github/workflows/miri.yaml
+++ b/.github/workflows/miri.yaml
@@ -31,6 +31,7 @@ on:
       - arrow-data/**
       - arrow-schema/**
       - arrow-select/**
+      - arrow-ipc/**
       - .github/**
 
 jobs:
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 5a7beadfd..b369ef69b 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -34,6 +34,7 @@ on:
       - arrow-data/**
       - arrow-schema/**
       - arrow-select/**
+      - arrow-ipc/**
       - parquet/**
       - .github/**
 
diff --git a/Cargo.toml b/Cargo.toml
index d8fa5b923..0ab4853c6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,6 +25,7 @@ members = [
         "arrow-flight",
         "arrow-integration-test",
         "arrow-integration-testing",
+        "arrow-ipc",
         "arrow-schema",
         "arrow-select",
         "parquet",
diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs
index 5c86978dc..15267d308 100644
--- a/arrow-array/src/lib.rs
+++ b/arrow-array/src/lib.rs
@@ -162,7 +162,7 @@ pub mod array;
 pub use array::*;
 
 mod record_batch;
-pub use record_batch::{RecordBatch, RecordBatchOptions};
+pub use record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 
 mod arithmetic;
 pub use arithmetic::ArrowNativeTypeOp;
diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs
index e613a38bb..6f2385fa9 100644
--- a/arrow-array/src/record_batch.rs
+++ b/arrow-array/src/record_batch.rs
@@ -22,6 +22,24 @@ use crate::{new_empty_array, Array, ArrayRef, StructArray};
 use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
 use std::sync::Arc;
 
+/// Trait for types that can read `RecordBatch`'s.
+pub trait RecordBatchReader: Iterator<Item = Result<RecordBatch, ArrowError>> {
+    /// Returns the schema of this `RecordBatchReader`.
+    ///
+    /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
+    /// reader should have the same schema as returned from this method.
+    fn schema(&self) -> SchemaRef;
+
+    /// Reads the next `RecordBatch`.
+    #[deprecated(
+        since = "2.0.0",
+        note = "This method is deprecated in favour of `next` from the trait Iterator."
+    )]
+    fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        self.next().transpose()
+    }
+}
+
 /// A two-dimensional batch of column-oriented data with a defined
 /// [schema](arrow_schema::Schema).
 ///
diff --git a/arrow-ipc/CONTRIBUTING.md b/arrow-ipc/CONTRIBUTING.md
new file mode 100644
index 000000000..5e14760f1
--- /dev/null
+++ b/arrow-ipc/CONTRIBUTING.md
@@ -0,0 +1,37 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+## Developer's guide
+
+# IPC
+
+The expected flatc version is 1.12.0+, built from [flatbuffers](https://github.com/google/flatbuffers)
+master at fixed commit ID, by regen.sh.
+
+The IPC flatbuffer code was generated by running this command from the root of the project:
+
+```bash
+./regen.sh
+```
+
+The above script will run the `flatc` compiler and perform some adjustments to the source code:
+
+- Replace `type__` with `type_`
+- Remove `org::apache::arrow::flatbuffers` namespace
+- Add includes to each generated file
diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml
new file mode 100644
index 000000000..52ad5fe2e
--- /dev/null
+++ b/arrow-ipc/Cargo.toml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "arrow-ipc"
+version = "26.0.0"
+description = "Support for the Arrow IPC format"
+homepage = "https://github.com/apache/arrow-rs"
+repository = "https://github.com/apache/arrow-rs"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = ["arrow"]
+include = [
+    "benches/*.rs",
+    "src/**/*.rs",
+    "Cargo.toml",
+]
+edition = "2021"
+rust-version = "1.62"
+
+[lib]
+name = "arrow_ipc"
+path = "src/lib.rs"
+bench = false
+
+[dependencies]
+arrow-array = { version = "26.0.0", path = "../arrow-array" }
+arrow-buffer = { version = "26.0.0", path = "../arrow-buffer" }
+arrow-cast = { version = "26.0.0", path = "../arrow-cast" }
+arrow-data = { version = "26.0.0", path = "../arrow-data" }
+arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
+flatbuffers = { version = "22.9.2", default-features = false, features = ["thiserror"] }
+lz4 = { version = "1.23", default-features = false, optional = true }
+zstd = { version = "0.11.1", default-features = false, optional = true }
+
+[dev-dependencies]
+tempfile = "3.3"
diff --git a/arrow/regen.sh b/arrow-ipc/regen.sh
similarity index 100%
rename from arrow/regen.sh
rename to arrow-ipc/regen.sh
diff --git a/arrow/src/ipc/compression/codec.rs b/arrow-ipc/src/compression.rs
similarity index 67%
rename from arrow/src/ipc/compression/codec.rs
rename to arrow-ipc/src/compression.rs
index 58ba8cb86..6349ac232 100644
--- a/arrow/src/ipc/compression/codec.rs
+++ b/arrow-ipc/src/compression.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::buffer::Buffer;
-use crate::error::{ArrowError, Result};
-use crate::ipc::CompressionType;
-use std::io::{Read, Write};
+use crate::CompressionType;
+use arrow_buffer::Buffer;
+use arrow_schema::ArrowError;
 
 const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
 const LENGTH_OF_PREFIX_DATA: i64 = 8;
@@ -33,7 +32,7 @@ pub enum CompressionCodec {
 impl TryFrom<CompressionType> for CompressionCodec {
     type Error = ArrowError;
 
-    fn try_from(compression_type: CompressionType) -> Result<Self> {
+    fn try_from(compression_type: CompressionType) -> Result<Self, ArrowError> {
         match compression_type {
             CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
             CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
@@ -60,7 +59,7 @@ impl CompressionCodec {
         &self,
         input: &[u8],
         output: &mut Vec<u8>,
-    ) -> Result<usize> {
+    ) -> Result<usize, ArrowError> {
         let uncompressed_data_len = input.len();
         let original_output_len = output.len();
 
@@ -92,7 +91,10 @@ impl CompressionCodec {
     /// [8 bytes]:         uncompressed length
     /// [remaining bytes]: compressed data stream
     /// ```
-    pub(crate) fn decompress_to_buffer(&self, input: &Buffer) -> Result<Buffer> {
+    pub(crate) fn decompress_to_buffer(
+        &self,
+        input: &Buffer,
+    ) -> Result<Buffer, ArrowError> {
         // read the first 8 bytes to determine if the data is
         // compressed
         let decompressed_length = read_uncompressed_size(input);
@@ -115,50 +117,89 @@ impl CompressionCodec {
 
     /// Compress the data in input buffer and write to output buffer
     /// using the specified compression
-    fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
+    fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
         match self {
-            CompressionCodec::Lz4Frame => {
-                let mut encoder = lz4::EncoderBuilder::new().build(output)?;
-                encoder.write_all(input)?;
-                match encoder.finish().1 {
-                    Ok(_) => Ok(()),
-                    Err(e) => Err(e.into()),
-                }
-            }
-            CompressionCodec::Zstd => {
-                let mut encoder = zstd::Encoder::new(output, 0)?;
-                encoder.write_all(input)?;
-                match encoder.finish() {
-                    Ok(_) => Ok(()),
-                    Err(e) => Err(e.into()),
-                }
-            }
+            CompressionCodec::Lz4Frame => compress_lz4(input, output),
+            CompressionCodec::Zstd => compress_zstd(input, output),
         }
     }
 
     /// Decompress the data in input buffer and write to output buffer
     /// using the specified compression
-    fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
-        let result: Result<usize> = match self {
-            CompressionCodec::Lz4Frame => {
-                let mut decoder = lz4::Decoder::new(input)?;
-                match decoder.read_to_end(output) {
-                    Ok(size) => Ok(size),
-                    Err(e) => Err(e.into()),
-                }
-            }
-            CompressionCodec::Zstd => {
-                let mut decoder = zstd::Decoder::new(input)?;
-                match decoder.read_to_end(output) {
-                    Ok(size) => Ok(size),
-                    Err(e) => Err(e.into()),
-                }
-            }
-        };
-        result
+    fn decompress(
+        &self,
+        input: &[u8],
+        output: &mut Vec<u8>,
+    ) -> Result<usize, ArrowError> {
+        match self {
+            CompressionCodec::Lz4Frame => decompress_lz4(input, output),
+            CompressionCodec::Zstd => decompress_zstd(input, output),
+        }
     }
 }
 
+#[cfg(feature = "lz4")]
+fn compress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
+    use std::io::Write;
+    let mut encoder = lz4::EncoderBuilder::new().build(output)?;
+    encoder.write_all(input)?;
+    encoder.finish().1?;
+    Ok(())
+}
+
+#[cfg(not(feature = "lz4"))]
+#[allow(clippy::ptr_arg)]
+fn compress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError> {
+    Err(ArrowError::InvalidArgumentError(
+        "lz4 IPC compression requires the lz4 feature".to_string(),
+    ))
+}
+
+#[cfg(feature = "lz4")]
+fn decompress_lz4(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+    use std::io::Read;
+    Ok(lz4::Decoder::new(input)?.read_to_end(output)?)
+}
+
+#[cfg(not(feature = "lz4"))]
+#[allow(clippy::ptr_arg)]
+fn decompress_lz4(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+    Err(ArrowError::InvalidArgumentError(
+        "lz4 IPC decompression requires the lz4 feature".to_string(),
+    ))
+}
+
+#[cfg(feature = "zstd")]
+fn compress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<(), ArrowError> {
+    use std::io::Write;
+    let mut encoder = zstd::Encoder::new(output, 0)?;
+    encoder.write_all(input)?;
+    encoder.finish()?;
+    Ok(())
+}
+
+#[cfg(not(feature = "zstd"))]
+#[allow(clippy::ptr_arg)]
+fn compress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<(), ArrowError> {
+    Err(ArrowError::InvalidArgumentError(
+        "zstd IPC compression requires the zstd feature".to_string(),
+    ))
+}
+
+#[cfg(feature = "zstd")]
+fn decompress_zstd(input: &[u8], output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+    use std::io::Read;
+    Ok(zstd::Decoder::new(input)?.read_to_end(output)?)
+}
+
+#[cfg(not(feature = "zstd"))]
+#[allow(clippy::ptr_arg)]
+fn decompress_zstd(_input: &[u8], _output: &mut Vec<u8>) -> Result<usize, ArrowError> {
+    Err(ArrowError::InvalidArgumentError(
+        "zstd IPC decompression requires the zstd feature".to_string(),
+    ))
+}
+
 /// Get the uncompressed length
 /// Notes:
 ///   LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed
@@ -173,12 +214,11 @@ fn read_uncompressed_size(buffer: &[u8]) -> i64 {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-
     #[test]
+    #[cfg(feature = "lz4")]
     fn test_lz4_compression() {
         let input_bytes = "hello lz4".as_bytes();
-        let codec: CompressionCodec = CompressionCodec::Lz4Frame;
+        let codec = super::CompressionCodec::Lz4Frame;
         let mut output_bytes: Vec<u8> = Vec::new();
         codec.compress(input_bytes, &mut output_bytes).unwrap();
         let mut result_output_bytes: Vec<u8> = Vec::new();
@@ -189,9 +229,10 @@ mod tests {
     }
 
     #[test]
+    #[cfg(feature = "zstd")]
     fn test_zstd_compression() {
         let input_bytes = "hello zstd".as_bytes();
-        let codec: CompressionCodec = CompressionCodec::Zstd;
+        let codec = super::CompressionCodec::Zstd;
         let mut output_bytes: Vec<u8> = Vec::new();
         codec.compress(input_bytes, &mut output_bytes).unwrap();
         let mut result_output_bytes: Vec<u8> = Vec::new();
diff --git a/arrow/src/ipc/convert.rs b/arrow-ipc/src/convert.rs
similarity index 81%
rename from arrow/src/ipc/convert.rs
rename to arrow-ipc/src/convert.rs
index 0f5d246bc..8d01c58b6 100644
--- a/arrow/src/ipc/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -17,16 +17,13 @@
 
 //! Utilities for converting between IPC types and native Arrow types
 
-use crate::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode};
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-
+use arrow_schema::*;
 use flatbuffers::{
     FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset,
 };
 use std::collections::{BTreeMap, HashMap};
 
-use crate::ipc::{size_prefixed_root_as_message, CONTINUATION_MARKER};
+use crate::{size_prefixed_root_as_message, CONTINUATION_MARKER};
 use DataType::*;
 
 /// Serialize a schema in IPC format
@@ -43,7 +40,7 @@ pub fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
 pub fn schema_to_fb_offset<'a>(
     fbb: &mut FlatBufferBuilder<'a>,
     schema: &Schema,
-) -> WIPOffset<ipc::Schema<'a>> {
+) -> WIPOffset<crate::Schema<'a>> {
     let mut fields = vec![];
     for field in schema.fields() {
         let fb_field = build_field(fbb, field);
@@ -55,7 +52,7 @@ pub fn schema_to_fb_offset<'a>(
         let fb_key_name = fbb.create_string(k.as_str());
         let fb_val_name = fbb.create_string(v.as_str());
 
-        let mut kv_builder = ipc::KeyValueBuilder::new(fbb);
+        let mut kv_builder = crate::KeyValueBuilder::new(fbb);
         kv_builder.add_key(fb_key_name);
         kv_builder.add_value(fb_val_name);
         custom_metadata.push(kv_builder.finish());
@@ -64,15 +61,15 @@ pub fn schema_to_fb_offset<'a>(
     let fb_field_list = fbb.create_vector(&fields);
     let fb_metadata_list = fbb.create_vector(&custom_metadata);
 
-    let mut builder = ipc::SchemaBuilder::new(fbb);
+    let mut builder = crate::SchemaBuilder::new(fbb);
     builder.add_fields(fb_field_list);
     builder.add_custom_metadata(fb_metadata_list);
     builder.finish()
 }
 
 /// Convert an IPC Field to Arrow Field
-impl<'a> From<ipc::Field<'a>> for Field {
-    fn from(field: ipc::Field) -> Field {
+impl<'a> From<crate::Field<'a>> for Field {
+    fn from(field: crate::Field) -> Field {
         let arrow_field = if let Some(dictionary) = field.dictionary() {
             Field::new_dict(
                 field.name().unwrap(),
@@ -105,14 +102,14 @@ impl<'a> From<ipc::Field<'a>> for Field {
 }
 
 /// Deserialize a Schema table from flat buffer format to Schema data type
-pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
+pub fn fb_to_schema(fb: crate::Schema) -> Schema {
     let mut fields: Vec<Field> = vec![];
     let c_fields = fb.fields().unwrap();
     let len = c_fields.len();
     for i in 0..len {
-        let c_field: ipc::Field = c_fields.get(i);
+        let c_field: crate::Field = c_fields.get(i);
         match c_field.type_type() {
-            ipc::Type::Decimal if fb.endianness() == ipc::Endianness::Big => {
+            crate::Type::Decimal if fb.endianness() == crate::Endianness::Big => {
                 unimplemented!("Big Endian is not supported for Decimal!")
             }
             _ => (),
@@ -138,8 +135,8 @@ pub fn fb_to_schema(fb: ipc::Schema) -> Schema {
 }
 
 /// Try deserialize flat buffer format bytes into a schema
-pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema> {
-    if let Ok(ipc) = ipc::root_as_message(bytes) {
+pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema, ArrowError> {
+    if let Ok(ipc) = crate::root_as_message(bytes) {
         if let Some(schema) = ipc.header_as_schema().map(fb_to_schema) {
             Ok(schema)
         } else {
@@ -155,7 +152,7 @@ pub fn try_schema_from_flatbuffer_bytes(bytes: &[u8]) -> Result<Schema> {
 }
 
 /// Try deserialize the IPC format bytes into a schema
-pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema> {
+pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema, ArrowError> {
     // There are two protocol types: https://issues.apache.org/jira/browse/ARROW-6313
     // The original protocal is:
     //   4 bytes - the byte length of the payload
@@ -200,7 +197,7 @@ pub fn try_schema_from_ipc_buffer(buffer: &[u8]) -> Result<Schema> {
 }
 
 /// Get the Arrow data type from the flatbuffer Field table
-pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataType {
+pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> DataType {
     if let Some(dictionary) = field.dictionary() {
         if may_be_dictionary {
             let int = dictionary.indexType().unwrap();
@@ -223,9 +220,9 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
     }
 
     match field.type_type() {
-        ipc::Type::Null => DataType::Null,
-        ipc::Type::Bool => DataType::Boolean,
-        ipc::Type::Int => {
+        crate::Type::Null => DataType::Null,
+        crate::Type::Bool => DataType::Boolean,
+        crate::Type::Int => {
             let int = field.type_as_int().unwrap();
             match (int.bitWidth(), int.is_signed()) {
                 (8, true) => DataType::Int8,
@@ -242,103 +239,109 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
                 ),
             }
         }
-        ipc::Type::Binary => DataType::Binary,
-        ipc::Type::LargeBinary => DataType::LargeBinary,
-        ipc::Type::Utf8 => DataType::Utf8,
-        ipc::Type::LargeUtf8 => DataType::LargeUtf8,
-        ipc::Type::FixedSizeBinary => {
+        crate::Type::Binary => DataType::Binary,
+        crate::Type::LargeBinary => DataType::LargeBinary,
+        crate::Type::Utf8 => DataType::Utf8,
+        crate::Type::LargeUtf8 => DataType::LargeUtf8,
+        crate::Type::FixedSizeBinary => {
             let fsb = field.type_as_fixed_size_binary().unwrap();
             DataType::FixedSizeBinary(fsb.byteWidth())
         }
-        ipc::Type::FloatingPoint => {
+        crate::Type::FloatingPoint => {
             let float = field.type_as_floating_point().unwrap();
             match float.precision() {
-                ipc::Precision::HALF => DataType::Float16,
-                ipc::Precision::SINGLE => DataType::Float32,
-                ipc::Precision::DOUBLE => DataType::Float64,
+                crate::Precision::HALF => DataType::Float16,
+                crate::Precision::SINGLE => DataType::Float32,
+                crate::Precision::DOUBLE => DataType::Float64,
                 z => panic!("FloatingPoint type with precision of {:?} not supported", z),
             }
         }
-        ipc::Type::Date => {
+        crate::Type::Date => {
             let date = field.type_as_date().unwrap();
             match date.unit() {
-                ipc::DateUnit::DAY => DataType::Date32,
-                ipc::DateUnit::MILLISECOND => DataType::Date64,
+                crate::DateUnit::DAY => DataType::Date32,
+                crate::DateUnit::MILLISECOND => DataType::Date64,
                 z => panic!("Date type with unit of {:?} not supported", z),
             }
         }
-        ipc::Type::Time => {
+        crate::Type::Time => {
             let time = field.type_as_time().unwrap();
             match (time.bitWidth(), time.unit()) {
-                (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
-                (32, ipc::TimeUnit::MILLISECOND) => {
+                (32, crate::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second),
+                (32, crate::TimeUnit::MILLISECOND) => {
                     DataType::Time32(TimeUnit::Millisecond)
                 }
-                (64, ipc::TimeUnit::MICROSECOND) => {
+                (64, crate::TimeUnit::MICROSECOND) => {
                     DataType::Time64(TimeUnit::Microsecond)
                 }
-                (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond),
+                (64, crate::TimeUnit::NANOSECOND) => {
+                    DataType::Time64(TimeUnit::Nanosecond)
+                }
                 z => panic!(
                     "Time type with bit width of {} and unit of {:?} not supported",
                     z.0, z.1
                 ),
             }
         }
-        ipc::Type::Timestamp => {
+        crate::Type::Timestamp => {
             let timestamp = field.type_as_timestamp().unwrap();
             let timezone: Option<String> = timestamp.timezone().map(|tz| tz.to_string());
             match timestamp.unit() {
-                ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
-                ipc::TimeUnit::MILLISECOND => {
+                crate::TimeUnit::SECOND => {
+                    DataType::Timestamp(TimeUnit::Second, timezone)
+                }
+                crate::TimeUnit::MILLISECOND => {
                     DataType::Timestamp(TimeUnit::Millisecond, timezone)
                 }
-                ipc::TimeUnit::MICROSECOND => {
+                crate::TimeUnit::MICROSECOND => {
                     DataType::Timestamp(TimeUnit::Microsecond, timezone)
                 }
-                ipc::TimeUnit::NANOSECOND => {
+                crate::TimeUnit::NANOSECOND => {
                     DataType::Timestamp(TimeUnit::Nanosecond, timezone)
                 }
                 z => panic!("Timestamp type with unit of {:?} not supported", z),
             }
         }
-        ipc::Type::Interval => {
+        crate::Type::Interval => {
             let interval = field.type_as_interval().unwrap();
             match interval.unit() {
-                ipc::IntervalUnit::YEAR_MONTH => {
+                crate::IntervalUnit::YEAR_MONTH => {
                     DataType::Interval(IntervalUnit::YearMonth)
                 }
-                ipc::IntervalUnit::DAY_TIME => DataType::Interval(IntervalUnit::DayTime),
-                ipc::IntervalUnit::MONTH_DAY_NANO => {
+                crate::IntervalUnit::DAY_TIME => {
+                    DataType::Interval(IntervalUnit::DayTime)
+                }
+                crate::IntervalUnit::MONTH_DAY_NANO => {
                     DataType::Interval(IntervalUnit::MonthDayNano)
                 }
                 z => panic!("Interval type with unit of {:?} unsupported", z),
             }
         }
-        ipc::Type::Duration => {
+        crate::Type::Duration => {
             let duration = field.type_as_duration().unwrap();
             match duration.unit() {
-                ipc::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
-                ipc::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
-                ipc::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
-                ipc::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
+                crate::TimeUnit::SECOND => DataType::Duration(TimeUnit::Second),
+                crate::TimeUnit::MILLISECOND => DataType::Duration(TimeUnit::Millisecond),
+                crate::TimeUnit::MICROSECOND => DataType::Duration(TimeUnit::Microsecond),
+                crate::TimeUnit::NANOSECOND => DataType::Duration(TimeUnit::Nanosecond),
                 z => panic!("Duration type with unit of {:?} unsupported", z),
             }
         }
-        ipc::Type::List => {
+        crate::Type::List => {
             let children = field.children().unwrap();
             if children.len() != 1 {
                 panic!("expect a list to have one child")
             }
             DataType::List(Box::new(children.get(0).into()))
         }
-        ipc::Type::LargeList => {
+        crate::Type::LargeList => {
             let children = field.children().unwrap();
             if children.len() != 1 {
                 panic!("expect a large list to have one child")
             }
             DataType::LargeList(Box::new(children.get(0).into()))
         }
-        ipc::Type::FixedSizeList => {
+        crate::Type::FixedSizeList => {
             let children = field.children().unwrap();
             if children.len() != 1 {
                 panic!("expect a list to have one child")
@@ -346,7 +349,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
             let fsl = field.type_as_fixed_size_list().unwrap();
             DataType::FixedSizeList(Box::new(children.get(0).into()), fsl.listSize())
         }
-        ipc::Type::Struct_ => {
+        crate::Type::Struct_ => {
             let mut fields = vec![];
             if let Some(children) = field.children() {
                 for i in 0..children.len() {
@@ -356,7 +359,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
 
             DataType::Struct(fields)
         }
-        ipc::Type::Map => {
+        crate::Type::Map => {
             let map = field.type_as_map().unwrap();
             let children = field.children().unwrap();
             if children.len() != 1 {
@@ -364,7 +367,7 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
             }
             DataType::Map(Box::new(children.get(0).into()), map.keysSorted())
         }
-        ipc::Type::Decimal => {
+        crate::Type::Decimal => {
             let fsb = field.type_as_decimal().unwrap();
             let bit_width = fsb.bitWidth();
             if bit_width == 128 {
@@ -381,12 +384,12 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
                 panic!("Unexpected decimal bit width {}", bit_width)
             }
         }
-        ipc::Type::Union => {
+        crate::Type::Union => {
             let union = field.type_as_union().unwrap();
 
             let union_mode = match union.mode() {
-                ipc::UnionMode::Dense => UnionMode::Dense,
-                ipc::UnionMode::Sparse => UnionMode::Sparse,
+                crate::UnionMode::Dense => UnionMode::Dense,
+                crate::UnionMode::Sparse => UnionMode::Sparse,
                 mode => panic!("Unexpected union mode: {:?}", mode),
             };
 
@@ -409,27 +412,27 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT
 }
 
 pub(crate) struct FBFieldType<'b> {
-    pub(crate) type_type: ipc::Type,
+    pub(crate) type_type: crate::Type,
     pub(crate) type_: WIPOffset<UnionWIPOffset>,
-    pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<ipc::Field<'b>>>>>,
+    pub(crate) children: Option<WIPOffset<Vector<'b, ForwardsUOffset<crate::Field<'b>>>>>,
 }
 
 /// Create an IPC Field from an Arrow Field
 pub(crate) fn build_field<'a>(
     fbb: &mut FlatBufferBuilder<'a>,
     field: &Field,
-) -> WIPOffset<ipc::Field<'a>> {
+) -> WIPOffset<crate::Field<'a>> {
     // Optional custom metadata.
     let mut fb_metadata = None;
     if let Some(metadata) = field.metadata() {
         if !metadata.is_empty() {
             let mut kv_vec = vec![];
             for (k, v) in metadata {
-                let kv_args = ipc::KeyValueArgs {
+                let kv_args = crate::KeyValueArgs {
                     key: Some(fbb.create_string(k.as_str())),
                     value: Some(fbb.create_string(v.as_str())),
                 };
-                let kv_offset = ipc::KeyValue::create(fbb, &kv_args);
+                let kv_offset = crate::KeyValue::create(fbb, &kv_args);
                 kv_vec.push(kv_offset);
             }
             fb_metadata = Some(fbb.create_vector(&kv_vec));
@@ -454,7 +457,7 @@ pub(crate) fn build_field<'a>(
         None
     };
 
-    let mut field_builder = ipc::FieldBuilder::new(fbb);
+    let mut field_builder = crate::FieldBuilder::new(fbb);
     field_builder.add_name(fb_field_name);
     if let Some(dictionary) = fb_dictionary {
         field_builder.add_dictionary(dictionary)
@@ -481,21 +484,21 @@ pub(crate) fn get_fb_field_type<'a>(
 ) -> FBFieldType<'a> {
     // some IPC implementations expect an empty list for child data, instead of a null value.
     // An empty field list is thus returned for primitive types
-    let empty_fields: Vec<WIPOffset<ipc::Field>> = vec![];
+    let empty_fields: Vec<WIPOffset<crate::Field>> = vec![];
     match data_type {
         Null => FBFieldType {
-            type_type: ipc::Type::Null,
-            type_: ipc::NullBuilder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::Null,
+            type_: crate::NullBuilder::new(fbb).finish().as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         Boolean => FBFieldType {
-            type_type: ipc::Type::Bool,
-            type_: ipc::BoolBuilder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::Bool,
+            type_: crate::BoolBuilder::new(fbb).finish().as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         UInt8 | UInt16 | UInt32 | UInt64 => {
             let children = fbb.create_vector(&empty_fields[..]);
-            let mut builder = ipc::IntBuilder::new(fbb);
+            let mut builder = crate::IntBuilder::new(fbb);
             builder.add_is_signed(false);
             match data_type {
                 UInt8 => builder.add_bitWidth(8),
@@ -505,14 +508,14 @@ pub(crate) fn get_fb_field_type<'a>(
                 _ => {}
             };
             FBFieldType {
-                type_type: ipc::Type::Int,
+                type_type: crate::Type::Int,
                 type_: builder.finish().as_union_value(),
                 children: Some(children),
             }
         }
         Int8 | Int16 | Int32 | Int64 => {
             let children = fbb.create_vector(&empty_fields[..]);
-            let mut builder = ipc::IntBuilder::new(fbb);
+            let mut builder = crate::IntBuilder::new(fbb);
             builder.add_is_signed(true);
             match data_type {
                 Int8 => builder.add_bitWidth(8),
@@ -522,95 +525,97 @@ pub(crate) fn get_fb_field_type<'a>(
                 _ => {}
             };
             FBFieldType {
-                type_type: ipc::Type::Int,
+                type_type: crate::Type::Int,
                 type_: builder.finish().as_union_value(),
                 children: Some(children),
             }
         }
         Float16 | Float32 | Float64 => {
             let children = fbb.create_vector(&empty_fields[..]);
-            let mut builder = ipc::FloatingPointBuilder::new(fbb);
+            let mut builder = crate::FloatingPointBuilder::new(fbb);
             match data_type {
-                Float16 => builder.add_precision(ipc::Precision::HALF),
-                Float32 => builder.add_precision(ipc::Precision::SINGLE),
-                Float64 => builder.add_precision(ipc::Precision::DOUBLE),
+                Float16 => builder.add_precision(crate::Precision::HALF),
+                Float32 => builder.add_precision(crate::Precision::SINGLE),
+                Float64 => builder.add_precision(crate::Precision::DOUBLE),
                 _ => {}
             };
             FBFieldType {
-                type_type: ipc::Type::FloatingPoint,
+                type_type: crate::Type::FloatingPoint,
                 type_: builder.finish().as_union_value(),
                 children: Some(children),
             }
         }
         Binary => FBFieldType {
-            type_type: ipc::Type::Binary,
-            type_: ipc::BinaryBuilder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::Binary,
+            type_: crate::BinaryBuilder::new(fbb).finish().as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         LargeBinary => FBFieldType {
-            type_type: ipc::Type::LargeBinary,
-            type_: ipc::LargeBinaryBuilder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::LargeBinary,
+            type_: crate::LargeBinaryBuilder::new(fbb)
+                .finish()
+                .as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         Utf8 => FBFieldType {
-            type_type: ipc::Type::Utf8,
-            type_: ipc::Utf8Builder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::Utf8,
+            type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         LargeUtf8 => FBFieldType {
-            type_type: ipc::Type::LargeUtf8,
-            type_: ipc::LargeUtf8Builder::new(fbb).finish().as_union_value(),
+            type_type: crate::Type::LargeUtf8,
+            type_: crate::LargeUtf8Builder::new(fbb).finish().as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
         FixedSizeBinary(len) => {
-            let mut builder = ipc::FixedSizeBinaryBuilder::new(fbb);
+            let mut builder = crate::FixedSizeBinaryBuilder::new(fbb);
             builder.add_byteWidth(*len as i32);
             FBFieldType {
-                type_type: ipc::Type::FixedSizeBinary,
+                type_type: crate::Type::FixedSizeBinary,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Date32 => {
-            let mut builder = ipc::DateBuilder::new(fbb);
-            builder.add_unit(ipc::DateUnit::DAY);
+            let mut builder = crate::DateBuilder::new(fbb);
+            builder.add_unit(crate::DateUnit::DAY);
             FBFieldType {
-                type_type: ipc::Type::Date,
+                type_type: crate::Type::Date,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Date64 => {
-            let mut builder = ipc::DateBuilder::new(fbb);
-            builder.add_unit(ipc::DateUnit::MILLISECOND);
+            let mut builder = crate::DateBuilder::new(fbb);
+            builder.add_unit(crate::DateUnit::MILLISECOND);
             FBFieldType {
-                type_type: ipc::Type::Date,
+                type_type: crate::Type::Date,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Time32(unit) | Time64(unit) => {
-            let mut builder = ipc::TimeBuilder::new(fbb);
+            let mut builder = crate::TimeBuilder::new(fbb);
             match unit {
                 TimeUnit::Second => {
                     builder.add_bitWidth(32);
-                    builder.add_unit(ipc::TimeUnit::SECOND);
+                    builder.add_unit(crate::TimeUnit::SECOND);
                 }
                 TimeUnit::Millisecond => {
                     builder.add_bitWidth(32);
-                    builder.add_unit(ipc::TimeUnit::MILLISECOND);
+                    builder.add_unit(crate::TimeUnit::MILLISECOND);
                 }
                 TimeUnit::Microsecond => {
                     builder.add_bitWidth(64);
-                    builder.add_unit(ipc::TimeUnit::MICROSECOND);
+                    builder.add_unit(crate::TimeUnit::MICROSECOND);
                 }
                 TimeUnit::Nanosecond => {
                     builder.add_bitWidth(64);
-                    builder.add_unit(ipc::TimeUnit::NANOSECOND);
+                    builder.add_unit(crate::TimeUnit::NANOSECOND);
                 }
             }
             FBFieldType {
-                type_type: ipc::Type::Time,
+                type_type: crate::Type::Time,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
@@ -618,48 +623,48 @@ pub(crate) fn get_fb_field_type<'a>(
         Timestamp(unit, tz) => {
             let tz = tz.clone().unwrap_or_default();
             let tz_str = fbb.create_string(tz.as_str());
-            let mut builder = ipc::TimestampBuilder::new(fbb);
+            let mut builder = crate::TimestampBuilder::new(fbb);
             let time_unit = match unit {
-                TimeUnit::Second => ipc::TimeUnit::SECOND,
-                TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND,
-                TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND,
-                TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
+                TimeUnit::Second => crate::TimeUnit::SECOND,
+                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
+                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
+                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
             };
             builder.add_unit(time_unit);
             if !tz.is_empty() {
                 builder.add_timezone(tz_str);
             }
             FBFieldType {
-                type_type: ipc::Type::Timestamp,
+                type_type: crate::Type::Timestamp,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Interval(unit) => {
-            let mut builder = ipc::IntervalBuilder::new(fbb);
+            let mut builder = crate::IntervalBuilder::new(fbb);
             let interval_unit = match unit {
-                IntervalUnit::YearMonth => ipc::IntervalUnit::YEAR_MONTH,
-                IntervalUnit::DayTime => ipc::IntervalUnit::DAY_TIME,
-                IntervalUnit::MonthDayNano => ipc::IntervalUnit::MONTH_DAY_NANO,
+                IntervalUnit::YearMonth => crate::IntervalUnit::YEAR_MONTH,
+                IntervalUnit::DayTime => crate::IntervalUnit::DAY_TIME,
+                IntervalUnit::MonthDayNano => crate::IntervalUnit::MONTH_DAY_NANO,
             };
             builder.add_unit(interval_unit);
             FBFieldType {
-                type_type: ipc::Type::Interval,
+                type_type: crate::Type::Interval,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Duration(unit) => {
-            let mut builder = ipc::DurationBuilder::new(fbb);
+            let mut builder = crate::DurationBuilder::new(fbb);
             let time_unit = match unit {
-                TimeUnit::Second => ipc::TimeUnit::SECOND,
-                TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND,
-                TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND,
-                TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND,
+                TimeUnit::Second => crate::TimeUnit::SECOND,
+                TimeUnit::Millisecond => crate::TimeUnit::MILLISECOND,
+                TimeUnit::Microsecond => crate::TimeUnit::MICROSECOND,
+                TimeUnit::Nanosecond => crate::TimeUnit::NANOSECOND,
             };
             builder.add_unit(time_unit);
             FBFieldType {
-                type_type: ipc::Type::Duration,
+                type_type: crate::Type::Duration,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
@@ -667,25 +672,25 @@ pub(crate) fn get_fb_field_type<'a>(
         List(ref list_type) => {
             let child = build_field(fbb, list_type);
             FBFieldType {
-                type_type: ipc::Type::List,
-                type_: ipc::ListBuilder::new(fbb).finish().as_union_value(),
+                type_type: crate::Type::List,
+                type_: crate::ListBuilder::new(fbb).finish().as_union_value(),
                 children: Some(fbb.create_vector(&[child])),
             }
         }
         LargeList(ref list_type) => {
             let child = build_field(fbb, list_type);
             FBFieldType {
-                type_type: ipc::Type::LargeList,
-                type_: ipc::LargeListBuilder::new(fbb).finish().as_union_value(),
+                type_type: crate::Type::LargeList,
+                type_: crate::LargeListBuilder::new(fbb).finish().as_union_value(),
                 children: Some(fbb.create_vector(&[child])),
             }
         }
         FixedSizeList(ref list_type, len) => {
             let child = build_field(fbb, list_type);
-            let mut builder = ipc::FixedSizeListBuilder::new(fbb);
+            let mut builder = crate::FixedSizeListBuilder::new(fbb);
             builder.add_listSize(*len as i32);
             FBFieldType {
-                type_type: ipc::Type::FixedSizeList,
+                type_type: crate::Type::FixedSizeList,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&[child])),
             }
@@ -697,17 +702,17 @@ pub(crate) fn get_fb_field_type<'a>(
                 children.push(build_field(fbb, field));
             }
             FBFieldType {
-                type_type: ipc::Type::Struct_,
-                type_: ipc::Struct_Builder::new(fbb).finish().as_union_value(),
+                type_type: crate::Type::Struct_,
+                type_: crate::Struct_Builder::new(fbb).finish().as_union_value(),
                 children: Some(fbb.create_vector(&children[..])),
             }
         }
         Map(map_field, keys_sorted) => {
             let child = build_field(fbb, map_field);
-            let mut field_type = ipc::MapBuilder::new(fbb);
+            let mut field_type = crate::MapBuilder::new(fbb);
             field_type.add_keysSorted(*keys_sorted);
             FBFieldType {
-                type_type: ipc::Type::Map,
+                type_type: crate::Type::Map,
                 type_: field_type.finish().as_union_value(),
                 children: Some(fbb.create_vector(&[child])),
             }
@@ -719,23 +724,23 @@ pub(crate) fn get_fb_field_type<'a>(
             get_fb_field_type(value_type, fbb)
         }
         Decimal128(precision, scale) => {
-            let mut builder = ipc::DecimalBuilder::new(fbb);
+            let mut builder = crate::DecimalBuilder::new(fbb);
             builder.add_precision(*precision as i32);
             builder.add_scale(*scale as i32);
             builder.add_bitWidth(128);
             FBFieldType {
-                type_type: ipc::Type::Decimal,
+                type_type: crate::Type::Decimal,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
         }
         Decimal256(precision, scale) => {
-            let mut builder = ipc::DecimalBuilder::new(fbb);
+            let mut builder = crate::DecimalBuilder::new(fbb);
             builder.add_precision(*precision as i32);
             builder.add_scale(*scale as i32);
             builder.add_bitWidth(256);
             FBFieldType {
-                type_type: ipc::Type::Decimal,
+                type_type: crate::Type::Decimal,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&empty_fields[..])),
             }
@@ -747,18 +752,18 @@ pub(crate) fn get_fb_field_type<'a>(
             }
 
             let union_mode = match mode {
-                UnionMode::Sparse => ipc::UnionMode::Sparse,
-                UnionMode::Dense => ipc::UnionMode::Dense,
+                UnionMode::Sparse => crate::UnionMode::Sparse,
+                UnionMode::Dense => crate::UnionMode::Dense,
             };
 
             let fbb_type_ids = fbb
                 .create_vector(&type_ids.iter().map(|t| *t as i32).collect::<Vec<_>>());
-            let mut builder = ipc::UnionBuilder::new(fbb);
+            let mut builder = crate::UnionBuilder::new(fbb);
             builder.add_mode(union_mode);
             builder.add_typeIds(fbb_type_ids);
 
             FBFieldType {
-                type_type: ipc::Type::Union,
+                type_type: crate::Type::Union,
                 type_: builder.finish().as_union_value(),
                 children: Some(fbb.create_vector(&children[..])),
             }
@@ -772,10 +777,10 @@ pub(crate) fn get_fb_dictionary<'a>(
     dict_id: i64,
     dict_is_ordered: bool,
     fbb: &mut FlatBufferBuilder<'a>,
-) -> WIPOffset<ipc::DictionaryEncoding<'a>> {
+) -> WIPOffset<crate::DictionaryEncoding<'a>> {
     // We assume that the dictionary index type (as an integer) has already been
     // validated elsewhere, and can safely assume we are dealing with integers
-    let mut index_builder = ipc::IntBuilder::new(fbb);
+    let mut index_builder = crate::IntBuilder::new(fbb);
 
     match *index_type {
         Int8 | Int16 | Int32 | Int64 => index_builder.add_is_signed(true),
@@ -793,7 +798,7 @@ pub(crate) fn get_fb_dictionary<'a>(
 
     let index_builder = index_builder.finish();
 
-    let mut builder = ipc::DictionaryEncodingBuilder::new(fbb);
+    let mut builder = crate::DictionaryEncodingBuilder::new(fbb);
     builder.add_id(dict_id);
     builder.add_indexType(index_builder);
     builder.add_isOrdered(dict_is_ordered);
@@ -804,7 +809,6 @@ pub(crate) fn get_fb_dictionary<'a>(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::datatypes::{DataType, Field, Schema, UnionMode};
 
     #[test]
     fn convert_schema_round_trip() {
@@ -1024,14 +1028,14 @@ mod tests {
         let fb = schema_to_fb(&schema);
 
         // read back fields
-        let ipc = ipc::root_as_schema(fb.finished_data()).unwrap();
+        let ipc = crate::root_as_schema(fb.finished_data()).unwrap();
         let schema2 = fb_to_schema(ipc);
         assert_eq!(schema, schema2);
     }
 
     #[test]
     fn schema_from_bytes() {
-        // bytes of a schema generated from python (0.14.0), saved as an `ipc::Message`.
+        // bytes of a schema generated from python (0.14.0), saved as an `crate::Message`.
         // the schema is: Field("field1", DataType::UInt32, false)
         let bytes: Vec<u8> = vec![
             16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
@@ -1041,7 +1045,7 @@ mod tests {
             4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
             0, 0, 0, 0,
         ];
-        let ipc = ipc::root_as_message(&bytes[..]).unwrap();
+        let ipc = crate::root_as_message(&bytes[..]).unwrap();
         let schema = ipc.header_as_schema().unwrap();
 
         // a message generated from Rust, same as the Python one
@@ -1053,7 +1057,7 @@ mod tests {
             8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
             0, 0,
         ];
-        let ipc2 = ipc::root_as_message(&bytes[..]).unwrap();
+        let ipc2 = crate::root_as_message(&bytes[..]).unwrap();
         let schema2 = ipc.header_as_schema().unwrap();
 
         assert_eq!(schema, schema2);
diff --git a/arrow/src/ipc/gen/File.rs b/arrow-ipc/src/gen/File.rs
similarity index 99%
rename from arrow/src/ipc/gen/File.rs
rename to arrow-ipc/src/gen/File.rs
index 9aafe910b..0e9427813 100644
--- a/arrow/src/ipc/gen/File.rs
+++ b/arrow-ipc/src/gen/File.rs
@@ -18,7 +18,7 @@
 #![allow(dead_code)]
 #![allow(unused_imports)]
 
-use crate::ipc::gen::Schema::*;
+use crate::gen::Schema::*;
 use flatbuffers::EndianScalar;
 use std::{cmp::Ordering, mem};
 // automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/Message.rs b/arrow-ipc/src/gen/Message.rs
similarity index 99%
rename from arrow/src/ipc/gen/Message.rs
rename to arrow-ipc/src/gen/Message.rs
index d4b3a57f1..2b9f79766 100644
--- a/arrow/src/ipc/gen/Message.rs
+++ b/arrow-ipc/src/gen/Message.rs
@@ -18,9 +18,9 @@
 #![allow(dead_code)]
 #![allow(unused_imports)]
 
-use crate::ipc::gen::Schema::*;
-use crate::ipc::gen::SparseTensor::*;
-use crate::ipc::gen::Tensor::*;
+use crate::gen::Schema::*;
+use crate::gen::SparseTensor::*;
+use crate::gen::Tensor::*;
 use flatbuffers::EndianScalar;
 use std::{cmp::Ordering, mem};
 // automatically generated by the FlatBuffers compiler, do not modify
@@ -340,7 +340,7 @@ pub struct MessageHeaderUnionTableOffset {}
 /// Metadata about a field at some level of a nested type tree (but not
 /// its children).
 ///
-/// For example, a List<Int16> with values `[[1, 2, 3], null, [4], [5, 6], null]`
+/// For example, a `List<Int16>` with values `[[1, 2, 3], null, [4], [5, 6], null]`
 /// would have {length: 5, null_count: 2} for its List node, and {length: 6,
 /// null_count: 0} for its Int16 node, as separate FieldNode structs
 // struct FieldNode, aligned to 8
diff --git a/arrow/src/ipc/gen/Schema.rs b/arrow-ipc/src/gen/Schema.rs
similarity index 100%
rename from arrow/src/ipc/gen/Schema.rs
rename to arrow-ipc/src/gen/Schema.rs
diff --git a/arrow/src/ipc/gen/SparseTensor.rs b/arrow-ipc/src/gen/SparseTensor.rs
similarity index 99%
rename from arrow/src/ipc/gen/SparseTensor.rs
rename to arrow-ipc/src/gen/SparseTensor.rs
index 317831c59..c5e06c30e 100644
--- a/arrow/src/ipc/gen/SparseTensor.rs
+++ b/arrow-ipc/src/gen/SparseTensor.rs
@@ -18,8 +18,8 @@
 #![allow(dead_code)]
 #![allow(unused_imports)]
 
-use crate::ipc::gen::Schema::*;
-use crate::ipc::gen::Tensor::*;
+use crate::gen::Schema::*;
+use crate::gen::Tensor::*;
 use flatbuffers::EndianScalar;
 use std::{cmp::Ordering, mem};
 // automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/Tensor.rs b/arrow-ipc/src/gen/Tensor.rs
similarity index 99%
rename from arrow/src/ipc/gen/Tensor.rs
rename to arrow-ipc/src/gen/Tensor.rs
index f22ff23c9..954ebd290 100644
--- a/arrow/src/ipc/gen/Tensor.rs
+++ b/arrow-ipc/src/gen/Tensor.rs
@@ -18,7 +18,7 @@
 #![allow(dead_code)]
 #![allow(unused_imports)]
 
-use crate::ipc::gen::Schema::*;
+use crate::gen::Schema::*;
 use flatbuffers::EndianScalar;
 use std::{cmp::Ordering, mem};
 // automatically generated by the FlatBuffers compiler, do not modify
diff --git a/arrow/src/ipc/gen/mod.rs b/arrow-ipc/src/gen/mod.rs
similarity index 100%
rename from arrow/src/ipc/gen/mod.rs
rename to arrow-ipc/src/gen/mod.rs
diff --git a/arrow/src/ipc/mod.rs b/arrow-ipc/src/lib.rs
similarity index 97%
rename from arrow/src/ipc/mod.rs
rename to arrow-ipc/src/lib.rs
index 2b30e7220..38217957d 100644
--- a/arrow/src/ipc/mod.rs
+++ b/arrow-ipc/src/lib.rs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Support for the Arrow IPC format
+
 // TODO: (vcq): Protobuf codegen is not generating Debug impls.
 #![allow(missing_debug_implementations)]
 
diff --git a/arrow/src/ipc/reader.rs b/arrow-ipc/src/reader.rs
similarity index 93%
rename from arrow/src/ipc/reader.rs
rename to arrow-ipc/src/reader.rs
index 1f2824b34..0165c775d 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -26,16 +26,14 @@ use std::fmt;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
-use crate::array::*;
-use crate::buffer::{Buffer, MutableBuffer};
-use crate::compute::cast;
-use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode};
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};
-
-use crate::ipc::compression::CompressionCodec;
-use ipc::CONTINUATION_MARKER;
+use arrow_array::*;
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_cast::cast;
+use arrow_data::ArrayData;
+use arrow_schema::*;
+
+use crate::compression::CompressionCodec;
+use crate::CONTINUATION_MARKER;
 use DataType::*;
 
 /// Read a buffer based on offset and length
@@ -48,10 +46,10 @@ use DataType::*;
 /// follows is not compressed, which can be useful for cases where
 /// compression does not yield appreciable savings.
 fn read_buffer(
-    buf: &ipc::Buffer,
+    buf: &crate::Buffer,
     a_data: &Buffer,
     compression_codec: &Option<CompressionCodec>,
-) -> Result<Buffer> {
+) -> Result<Buffer, ArrowError> {
     let start_offset = buf.offset() as usize;
     let buf_data = a_data.slice_with_length(start_offset, buf.length() as usize);
     // corner case: empty buffer
@@ -72,16 +70,16 @@ fn read_buffer(
 ///     - cast the 64-bit array to the appropriate data type
 #[allow(clippy::too_many_arguments)]
 fn create_array(
-    nodes: flatbuffers::Vector<'_, ipc::FieldNode>,
+    nodes: flatbuffers::Vector<'_, crate::FieldNode>,
     field: &Field,
     data: &Buffer,
-    buffers: flatbuffers::Vector<'_, ipc::Buffer>,
+    buffers: flatbuffers::Vector<'_, crate::Buffer>,
     dictionaries_by_id: &HashMap<i64, ArrayRef>,
     mut node_index: usize,
     mut buffer_index: usize,
     compression_codec: &Option<CompressionCodec>,
-    metadata: &ipc::MetadataVersion,
-) -> Result<(ArrayRef, usize, usize)> {
+    metadata: &crate::MetadataVersion,
+) -> Result<(ArrayRef, usize, usize), ArrowError> {
     let data_type = field.data_type();
     let array = match data_type {
         Utf8 | Binary | LargeBinary | LargeUtf8 => {
@@ -232,7 +230,7 @@ fn create_array(
 
             // In V4, union types has validity bitmap
             // In V5 and later, union types have no validity bitmap
-            if metadata < &ipc::MetadataVersion::V5 {
+            if metadata < &crate::MetadataVersion::V5 {
                 read_buffer(buffers.get(buffer_index), data, compression_codec)?;
                 buffer_index += 1;
             }
@@ -323,7 +321,7 @@ fn skip_field(
     data_type: &DataType,
     mut node_index: usize,
     mut buffer_index: usize,
-) -> Result<(usize, usize)> {
+) -> Result<(usize, usize), ArrowError> {
     match data_type {
         Utf8 | Binary | LargeBinary | LargeUtf8 => {
             node_index += 1;
@@ -396,7 +394,7 @@ fn skip_field(
 /// Reads the correct number of buffers based on data type and null_count, and creates a
 /// primitive array ref
 fn create_primitive_array(
-    field_node: &ipc::FieldNode,
+    field_node: &crate::FieldNode,
     data_type: &DataType,
     buffers: &[Buffer],
 ) -> ArrayRef {
@@ -536,7 +534,7 @@ fn get_aligned_buffer<T>(buffer: &Buffer, length: usize) -> Buffer {
 /// Reads the correct number of buffers based on list type and null_count, and creates a
 /// list array ref
 fn create_list_array(
-    field_node: &ipc::FieldNode,
+    field_node: &crate::FieldNode,
     data_type: &DataType,
     buffers: &[Buffer],
     child_array: ArrayRef,
@@ -564,7 +562,7 @@ fn create_list_array(
 /// Reads the correct number of buffers based on list type and null_count, and creates a
 /// list array ref
 fn create_dictionary_array(
-    field_node: &ipc::FieldNode,
+    field_node: &crate::FieldNode,
     data_type: &DataType,
     buffers: &[Buffer],
     value_array: ArrayRef,
@@ -583,15 +581,15 @@ fn create_dictionary_array(
     }
 }
 
-/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
+/// Creates a record batch from binary data using the `crate::RecordBatch` indexes and the `Schema`
 pub fn read_record_batch(
     buf: &Buffer,
-    batch: ipc::RecordBatch,
+    batch: crate::RecordBatch,
     schema: SchemaRef,
     dictionaries_by_id: &HashMap<i64, ArrayRef>,
     projection: Option<&[usize]>,
-    metadata: &ipc::MetadataVersion,
-) -> Result<RecordBatch> {
+    metadata: &crate::MetadataVersion,
+) -> Result<RecordBatch, ArrowError> {
     let buffers = batch.buffers().ok_or_else(|| {
         ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
     })?;
@@ -669,11 +667,11 @@ pub fn read_record_batch(
 /// updating the `dictionaries_by_id` with the resulting dictionary
 pub fn read_dictionary(
     buf: &Buffer,
-    batch: ipc::DictionaryBatch,
+    batch: crate::DictionaryBatch,
     schema: &Schema,
     dictionaries_by_id: &mut HashMap<i64, ArrayRef>,
-    metadata: &ipc::MetadataVersion,
-) -> Result<()> {
+    metadata: &crate::MetadataVersion,
+) -> Result<(), ArrowError> {
     if batch.isDelta() {
         return Err(ArrowError::IoError(
             "delta dictionary batches not supported".to_string(),
@@ -732,7 +730,7 @@ pub struct FileReader<R: Read + Seek> {
     /// The blocks in the file
     ///
     /// A block indicates the regions in the file to read to get data
-    blocks: Vec<ipc::Block>,
+    blocks: Vec<crate::Block>,
 
     /// A counter to keep track of the current block that should be read
     current_block: usize,
@@ -746,7 +744,7 @@ pub struct FileReader<R: Read + Seek> {
     dictionaries_by_id: HashMap<i64, ArrayRef>,
 
     /// Metadata version
-    metadata_version: ipc::MetadataVersion,
+    metadata_version: crate::MetadataVersion,
 
     /// Optional projection and projected_schema
     projection: Option<(Vec<usize>, Schema)>,
@@ -772,7 +770,10 @@ impl<R: Read + Seek> FileReader<R> {
     ///
     /// Returns errors if the file does not meet the Arrow Format header and footer
     /// requirements
-    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> {
+    pub fn try_new(
+        reader: R,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self, ArrowError> {
         let mut reader = BufReader::new(reader);
         // check if header and footer contain correct magic bytes
         let mut magic_buffer: [u8; 6] = [0; 6];
@@ -800,7 +801,7 @@ impl<R: Read + Seek> FileReader<R> {
         reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
         reader.read_exact(&mut footer_data)?;
 
-        let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| {
+        let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
             ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
         })?;
 
@@ -813,7 +814,7 @@ impl<R: Read + Seek> FileReader<R> {
         let total_blocks = blocks.len();
 
         let ipc_schema = footer.schema().unwrap();
-        let schema = ipc::convert::fb_to_schema(ipc_schema);
+        let schema = crate::convert::fb_to_schema(ipc_schema);
 
         // Create an array of optional dictionary value arrays, one per field.
         let mut dictionaries_by_id = HashMap::new();
@@ -831,7 +832,7 @@ impl<R: Read + Seek> FileReader<R> {
 
                 reader.read_exact(&mut block_data)?;
 
-                let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
+                let message = crate::root_as_message(&block_data[..]).map_err(|err| {
                     ArrowError::IoError(format!(
                         "Unable to get root as message: {:?}",
                         err
@@ -839,7 +840,7 @@ impl<R: Read + Seek> FileReader<R> {
                 })?;
 
                 match message.header_type() {
-                    ipc::MessageHeader::DictionaryBatch => {
+                    crate::MessageHeader::DictionaryBatch => {
                         let batch = message.header_as_dictionary_batch().unwrap();
 
                         // read the block that makes up the dictionary batch into a buffer
@@ -900,7 +901,7 @@ impl<R: Read + Seek> FileReader<R> {
     /// Read a specific record batch
     ///
     /// Sets the current block to the index, allowing random reads
-    pub fn set_index(&mut self, index: usize) -> Result<()> {
+    pub fn set_index(&mut self, index: usize) -> Result<(), ArrowError> {
         if index >= self.total_blocks {
             Err(ArrowError::IoError(format!(
                 "Cannot set batch to index {} from {} total batches",
@@ -912,7 +913,7 @@ impl<R: Read + Seek> FileReader<R> {
         }
     }
 
-    fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
+    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
         let block = self.blocks[self.current_block];
         self.current_block += 1;
 
@@ -928,12 +929,12 @@ impl<R: Read + Seek> FileReader<R> {
 
         let mut block_data = vec![0; meta_len as usize];
         self.reader.read_exact(&mut block_data)?;
-        let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
+        let message = crate::root_as_message(&block_data[..]).map_err(|err| {
             ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
         })?;
 
         // some old test data's footer metadata is not set, so we account for that
-        if self.metadata_version != ipc::MetadataVersion::V1
+        if self.metadata_version != crate::MetadataVersion::V1
             && message.version() != self.metadata_version
         {
             return Err(ArrowError::IoError(
@@ -942,10 +943,10 @@ impl<R: Read + Seek> FileReader<R> {
         }
 
         match message.header_type() {
-            ipc::MessageHeader::Schema => Err(ArrowError::IoError(
+            crate::MessageHeader::Schema => Err(ArrowError::IoError(
                 "Not expecting a schema when messages are read".to_string(),
             )),
-            ipc::MessageHeader::RecordBatch => {
+            crate::MessageHeader::RecordBatch => {
                 let batch = message.header_as_record_batch().ok_or_else(|| {
                     ArrowError::IoError(
                         "Unable to read IPC message as record batch".to_string(),
@@ -968,7 +969,7 @@ impl<R: Read + Seek> FileReader<R> {
 
                 ).map(Some)
             }
-            ipc::MessageHeader::NONE => {
+            crate::MessageHeader::NONE => {
                 Ok(None)
             }
             t => Err(ArrowError::IoError(format!(
@@ -979,7 +980,7 @@ impl<R: Read + Seek> FileReader<R> {
 }
 
 impl<R: Read + Seek> Iterator for FileReader<R> {
-    type Item = Result<RecordBatch>;
+    type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
         // get current block
@@ -1037,7 +1038,10 @@ impl<R: Read> StreamReader<R> {
     /// The first message in the stream is the schema, the reader will fail if it does not
     /// encounter a schema.
     /// To check if the reader is done, use `is_finished(self)`
-    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self> {
+    pub fn try_new(
+        reader: R,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self, ArrowError> {
         let mut reader = BufReader::new(reader);
         // determine metadata length
         let mut meta_size: [u8; 4] = [0; 4];
@@ -1054,14 +1058,14 @@ impl<R: Read> StreamReader<R> {
         let mut meta_buffer = vec![0; meta_len as usize];
         reader.read_exact(&mut meta_buffer)?;
 
-        let message = ipc::root_as_message(meta_buffer.as_slice()).map_err(|err| {
+        let message = crate::root_as_message(meta_buffer.as_slice()).map_err(|err| {
             ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
         })?;
         // message header is a Schema, so read it
-        let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
+        let ipc_schema: crate::Schema = message.header_as_schema().ok_or_else(|| {
             ArrowError::IoError("Unable to read IPC message as schema".to_string())
         })?;
-        let schema = ipc::convert::fb_to_schema(ipc_schema);
+        let schema = crate::convert::fb_to_schema(ipc_schema);
 
         // Create an array of optional dictionary value arrays, one per field.
         let dictionaries_by_id = HashMap::new();
@@ -1092,7 +1096,7 @@ impl<R: Read> StreamReader<R> {
         self.finished
     }
 
-    fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
+    fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
         if self.finished {
             return Ok(None);
         }
@@ -1133,15 +1137,15 @@ impl<R: Read> StreamReader<R> {
         self.reader.read_exact(&mut meta_buffer)?;
 
         let vecs = &meta_buffer.to_vec();
-        let message = ipc::root_as_message(vecs).map_err(|err| {
+        let message = crate::root_as_message(vecs).map_err(|err| {
             ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
         })?;
 
         match message.header_type() {
-            ipc::MessageHeader::Schema => Err(ArrowError::IoError(
+            crate::MessageHeader::Schema => Err(ArrowError::IoError(
                 "Not expecting a schema when messages are read".to_string(),
             )),
-            ipc::MessageHeader::RecordBatch => {
+            crate::MessageHeader::RecordBatch => {
                 let batch = message.header_as_record_batch().ok_or_else(|| {
                     ArrowError::IoError(
                         "Unable to read IPC message as record batch".to_string(),
@@ -1153,7 +1157,7 @@ impl<R: Read> StreamReader<R> {
 
                 read_record_batch(&buf.into(), batch, self.schema(), &self.dictionaries_by_id, self.projection.as_ref().map(|x| x.0.as_ref()), &message.version()).map(Some)
             }
-            ipc::MessageHeader::DictionaryBatch => {
+            crate::MessageHeader::DictionaryBatch => {
                 let batch = message.header_as_dictionary_batch().ok_or_else(|| {
                     ArrowError::IoError(
                         "Unable to read IPC message as dictionary batch".to_string(),
@@ -1170,7 +1174,7 @@ impl<R: Read> StreamReader<R> {
                 // read the next message until we encounter a RecordBatch
                 self.maybe_next()
             }
-            ipc::MessageHeader::NONE => {
+            crate::MessageHeader::NONE => {
                 Ok(None)
             }
             t => Err(ArrowError::IoError(
@@ -1181,7 +1185,7 @@ impl<R: Read> StreamReader<R> {
 }
 
 impl<R: Read> Iterator for StreamReader<R> {
-    type Item = Result<RecordBatch>;
+    type Item = Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
         self.maybe_next().transpose()
@@ -1198,10 +1202,10 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {
 mod tests {
     use super::*;
 
-    use std::fs::File;
-
-    use crate::datatypes;
-    use crate::datatypes::{ArrowNativeType, Float64Type, Int32Type, Int8Type};
+    use arrow_array::builder::UnionBuilder;
+    use arrow_array::types::*;
+    use arrow_buffer::ArrowNativeType;
+    use arrow_data::ArrayDataBuilder;
 
     fn create_test_projection_schema() -> Schema {
         // define field types
@@ -1347,7 +1351,8 @@ mod tests {
         // write record batch in IPC format
         let mut buf = Vec::new();
         {
-            let mut writer = ipc::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
+            let mut writer =
+                crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
             writer.write(&batch).unwrap();
             writer.finish().unwrap();
         }
@@ -1382,15 +1387,18 @@ mod tests {
         ];
         let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
         // create stream writer
-        let file = File::create("target/debug/testdata/float.stream").unwrap();
+        let mut file = tempfile::tempfile().unwrap();
         let mut stream_writer =
-            crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
+            crate::writer::StreamWriter::try_new(&mut file, &schema).unwrap();
         stream_writer.write(&batch).unwrap();
         stream_writer.finish().unwrap();
 
+        drop(stream_writer);
+
+        file.rewind().unwrap();
+
         // read stream back
-        let file = File::open("target/debug/testdata/float.stream").unwrap();
-        let reader = StreamReader::try_new(file, None).unwrap();
+        let reader = StreamReader::try_new(&mut file, None).unwrap();
 
         reader.for_each(|batch| {
             let batch = batch.unwrap();
@@ -1414,7 +1422,7 @@ mod tests {
             );
         });
 
-        let file = File::open("target/debug/testdata/float.stream").unwrap();
+        file.rewind().unwrap();
 
         // Read with projection
         let reader = StreamReader::try_new(file, Some(vec![0, 3])).unwrap();
@@ -1430,33 +1438,33 @@ mod tests {
     fn roundtrip_ipc(rb: &RecordBatch) -> RecordBatch {
         let mut buf = Vec::new();
         let mut writer =
-            ipc::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
+            crate::writer::FileWriter::try_new(&mut buf, &rb.schema()).unwrap();
         writer.write(rb).unwrap();
         writer.finish().unwrap();
         drop(writer);
 
         let mut reader =
-            ipc::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+            crate::reader::FileReader::try_new(std::io::Cursor::new(buf), None).unwrap();
         reader.next().unwrap().unwrap()
     }
 
     fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
         let mut buf = Vec::new();
         let mut writer =
-            ipc::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
+            crate::writer::StreamWriter::try_new(&mut buf, &rb.schema()).unwrap();
         writer.write(rb).unwrap();
         writer.finish().unwrap();
         drop(writer);
 
         let mut reader =
-            ipc::reader::StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
+            crate::reader::StreamReader::try_new(std::io::Cursor::new(buf), None)
+                .unwrap();
         reader.next().unwrap().unwrap()
     }
 
     #[test]
     fn test_roundtrip_nested_dict() {
-        let inner: DictionaryArray<datatypes::Int32Type> =
-            vec!["a", "b", "a"].into_iter().collect();
+        let inner: DictionaryArray<Int32Type> = vec!["a", "b", "a"].into_iter().collect();
 
         let array = Arc::new(inner) as ArrayRef;
 
@@ -1477,11 +1485,11 @@ mod tests {
     }
 
     fn check_union_with_builder(mut builder: UnionBuilder) {
-        builder.append::<datatypes::Int32Type>("a", 1).unwrap();
-        builder.append_null::<datatypes::Int32Type>("a").unwrap();
-        builder.append::<datatypes::Float64Type>("c", 3.0).unwrap();
-        builder.append::<datatypes::Int32Type>("a", 4).unwrap();
-        builder.append::<datatypes::Int64Type>("d", 11).unwrap();
+        builder.append::<Int32Type>("a", 1).unwrap();
+        builder.append_null::<Int32Type>("a").unwrap();
+        builder.append::<Float64Type>("c", 3.0).unwrap();
+        builder.append::<Int32Type>("a", 4).unwrap();
+        builder.append::<Int64Type>("d", 11).unwrap();
         let union = builder.build().unwrap();
 
         let schema = Arc::new(Schema::new(vec![Field::new(
@@ -1521,7 +1529,7 @@ mod tests {
         let dict = Arc::new(
             xs.clone()
                 .into_iter()
-                .collect::<DictionaryArray<datatypes::Int8Type>>(),
+                .collect::<DictionaryArray<Int8Type>>(),
         );
         let string_array: ArrayRef = Arc::new(StringArray::from(xs.clone()));
         let struct_array = StructArray::from(vec![
diff --git a/arrow/src/ipc/writer.rs b/arrow-ipc/src/writer.rs
similarity index 87%
rename from arrow/src/ipc/writer.rs
rename to arrow-ipc/src/writer.rs
index 4f40574ab..44f32f0cb 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -26,21 +26,16 @@ use std::io::{BufWriter, Write};
 
 use flatbuffers::FlatBufferBuilder;
 
-use crate::array::{
-    as_large_list_array, as_list_array, as_map_array, as_struct_array, as_union_array,
-    layout, make_array, Array, ArrayData, ArrayRef, BinaryArray, BufferBuilder,
-    BufferSpec, FixedSizeListArray, GenericBinaryArray, GenericStringArray,
-    LargeBinaryArray, LargeStringArray, OffsetSizeTrait, StringArray,
-};
-use crate::buffer::{Buffer, MutableBuffer};
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::ipc;
-use crate::record_batch::RecordBatch;
-use crate::util::bit_util;
-
-use crate::ipc::compression::CompressionCodec;
-use ipc::CONTINUATION_MARKER;
+use arrow_array::builder::BufferBuilder;
+use arrow_array::cast::*;
+use arrow_array::*;
+use arrow_buffer::bit_util;
+use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_data::{layout, ArrayData, BufferSpec};
+use arrow_schema::*;
+
+use crate::compression::CompressionCodec;
+use crate::CONTINUATION_MARKER;
 
 /// IPC write options used to control the behaviour of the writer
 #[derive(Debug, Clone)]
@@ -58,24 +53,25 @@ pub struct IpcWriteOptions {
     ///
     /// version 2.0.0: V4, with legacy format enabled
     /// version 4.0.0: V5
-    metadata_version: ipc::MetadataVersion,
-    /// Compression, if desired. Only supported when `ipc_compression`
-    /// feature is enabled
-    batch_compression_type: Option<ipc::CompressionType>,
+    metadata_version: crate::MetadataVersion,
+    /// Compression, if desired. Will result in a runtime error
+    /// if the corresponding feature is not enabled
+    batch_compression_type: Option<crate::CompressionType>,
 }
 
 impl IpcWriteOptions {
-    /// Configures compression when writing IPC files. Requires the
-    /// `ipc_compression` feature of the crate to be activated.
-    #[cfg(feature = "ipc_compression")]
+    /// Configures compression when writing IPC files.
+    ///
+    /// Will result in a runtime error if the corresponding feature
+    /// is not enabled
     pub fn try_with_compression(
         mut self,
-        batch_compression_type: Option<ipc::CompressionType>,
-    ) -> Result<Self> {
+        batch_compression_type: Option<crate::CompressionType>,
+    ) -> Result<Self, ArrowError> {
         self.batch_compression_type = batch_compression_type;
 
         if self.batch_compression_type.is_some()
-            && self.metadata_version < ipc::MetadataVersion::V5
+            && self.metadata_version < crate::MetadataVersion::V5
         {
             return Err(ArrowError::InvalidArgumentError(
                 "Compression only supported in metadata v5 and above".to_string(),
@@ -87,26 +83,26 @@ impl IpcWriteOptions {
     pub fn try_new(
         alignment: usize,
         write_legacy_ipc_format: bool,
-        metadata_version: ipc::MetadataVersion,
-    ) -> Result<Self> {
+        metadata_version: crate::MetadataVersion,
+    ) -> Result<Self, ArrowError> {
         if alignment == 0 || alignment % 8 != 0 {
             return Err(ArrowError::InvalidArgumentError(
                 "Alignment should be greater than 0 and be a multiple of 8".to_string(),
             ));
         }
         match metadata_version {
-            ipc::MetadataVersion::V1
-            | ipc::MetadataVersion::V2
-            | ipc::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
+            crate::MetadataVersion::V1
+            | crate::MetadataVersion::V2
+            | crate::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError(
                 "Writing IPC metadata version 3 and lower not supported".to_string(),
             )),
-            ipc::MetadataVersion::V4 => Ok(Self {
+            crate::MetadataVersion::V4 => Ok(Self {
                 alignment,
                 write_legacy_ipc_format,
                 metadata_version,
                 batch_compression_type: None,
             }),
-            ipc::MetadataVersion::V5 => {
+            crate::MetadataVersion::V5 => {
                 if write_legacy_ipc_format {
                     Err(ArrowError::InvalidArgumentError(
                         "Legacy IPC format only supported on metadata version 4"
@@ -122,7 +118,7 @@ impl IpcWriteOptions {
                 }
             }
             z => Err(ArrowError::InvalidArgumentError(format!(
-                "Unsupported ipc::MetadataVersion {:?}",
+                "Unsupported crate::MetadataVersion {:?}",
                 z
             ))),
         }
@@ -134,7 +130,7 @@ impl Default for IpcWriteOptions {
         Self {
             alignment: 64,
             write_legacy_ipc_format: false,
-            metadata_version: ipc::MetadataVersion::V5,
+            metadata_version: crate::MetadataVersion::V5,
             batch_compression_type: None,
         }
     }
@@ -151,13 +147,13 @@ impl IpcDataGenerator {
     ) -> EncodedData {
         let mut fbb = FlatBufferBuilder::new();
         let schema = {
-            let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
+            let fb = crate::convert::schema_to_fb_offset(&mut fbb, schema);
             fb.as_union_value()
         };
 
-        let mut message = ipc::MessageBuilder::new(&mut fbb);
+        let mut message = crate::MessageBuilder::new(&mut fbb);
         message.add_version(write_options.metadata_version);
-        message.add_header_type(ipc::MessageHeader::Schema);
+        message.add_header_type(crate::MessageHeader::Schema);
         message.add_bodyLength(0);
         message.add_header(schema);
         // TODO: custom metadata
@@ -177,7 +173,7 @@ impl IpcDataGenerator {
         encoded_dictionaries: &mut Vec<EncodedData>,
         dictionary_tracker: &mut DictionaryTracker,
         write_options: &IpcWriteOptions,
-    ) -> Result<()> {
+    ) -> Result<(), ArrowError> {
         match column.data_type() {
             DataType::Struct(fields) => {
                 let s = as_struct_array(column);
@@ -281,7 +277,7 @@ impl IpcDataGenerator {
         encoded_dictionaries: &mut Vec<EncodedData>,
         dictionary_tracker: &mut DictionaryTracker,
         write_options: &IpcWriteOptions,
-    ) -> Result<()> {
+    ) -> Result<(), ArrowError> {
         match column.data_type() {
             DataType::Dictionary(_key_type, _value_type) => {
                 let dict_id = field
@@ -325,7 +321,7 @@ impl IpcDataGenerator {
         batch: &RecordBatch,
         dictionary_tracker: &mut DictionaryTracker,
         write_options: &IpcWriteOptions,
-    ) -> Result<(Vec<EncodedData>, EncodedData)> {
+    ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
         let schema = batch.schema();
         let mut encoded_dictionaries = Vec::with_capacity(schema.all_fields().len());
 
@@ -344,17 +340,17 @@ impl IpcDataGenerator {
         Ok((encoded_dictionaries, encoded_message))
     }
 
-    /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Message) and the
+    /// Write a `RecordBatch` into two sets of bytes, one for the header (crate::Message) and the
     /// other for the batch's data
     fn record_batch_to_bytes(
         &self,
         batch: &RecordBatch,
         write_options: &IpcWriteOptions,
-    ) -> Result<EncodedData> {
+    ) -> Result<EncodedData, ArrowError> {
         let mut fbb = FlatBufferBuilder::new();
 
-        let mut nodes: Vec<ipc::FieldNode> = vec![];
-        let mut buffers: Vec<ipc::Buffer> = vec![];
+        let mut nodes: Vec<crate::FieldNode> = vec![];
+        let mut buffers: Vec<crate::Buffer> = vec![];
         let mut arrow_data: Vec<u8> = vec![];
         let mut offset = 0;
 
@@ -362,8 +358,8 @@ impl IpcDataGenerator {
         let batch_compression_type = write_options.batch_compression_type;
 
         let compression = batch_compression_type.map(|batch_compression_type| {
-            let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
-            c.add_method(ipc::BodyCompressionMethod::BUFFER);
+            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
+            c.add_method(crate::BodyCompressionMethod::BUFFER);
             c.add_codec(batch_compression_type);
             c.finish()
         });
@@ -394,7 +390,7 @@ impl IpcDataGenerator {
         let buffers = fbb.create_vector(&buffers);
         let nodes = fbb.create_vector(&nodes);
         let root = {
-            let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
             batch_builder.add_length(batch.num_rows() as i64);
             batch_builder.add_nodes(nodes);
             batch_builder.add_buffers(buffers);
@@ -404,10 +400,10 @@ impl IpcDataGenerator {
             let b = batch_builder.finish();
             b.as_union_value()
         };
-        // create an ipc::Message
-        let mut message = ipc::MessageBuilder::new(&mut fbb);
+        // create an crate::Message
+        let mut message = crate::MessageBuilder::new(&mut fbb);
         message.add_version(write_options.metadata_version);
-        message.add_header_type(ipc::MessageHeader::RecordBatch);
+        message.add_header_type(crate::MessageHeader::RecordBatch);
         message.add_bodyLength(arrow_data.len() as i64);
         message.add_header(root);
         let root = message.finish();
@@ -420,26 +416,26 @@ impl IpcDataGenerator {
         })
     }
 
-    /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the
+    /// Write dictionary values into two sets of bytes, one for the header (crate::Message) and the
     /// other for the data
     fn dictionary_batch_to_bytes(
         &self,
         dict_id: i64,
         array_data: &ArrayData,
         write_options: &IpcWriteOptions,
-    ) -> Result<EncodedData> {
+    ) -> Result<EncodedData, ArrowError> {
         let mut fbb = FlatBufferBuilder::new();
 
-        let mut nodes: Vec<ipc::FieldNode> = vec![];
-        let mut buffers: Vec<ipc::Buffer> = vec![];
+        let mut nodes: Vec<crate::FieldNode> = vec![];
+        let mut buffers: Vec<crate::Buffer> = vec![];
         let mut arrow_data: Vec<u8> = vec![];
 
         // get the type of compression
         let batch_compression_type = write_options.batch_compression_type;
 
         let compression = batch_compression_type.map(|batch_compression_type| {
-            let mut c = ipc::BodyCompressionBuilder::new(&mut fbb);
-            c.add_method(ipc::BodyCompressionMethod::BUFFER);
+            let mut c = crate::BodyCompressionBuilder::new(&mut fbb);
+            c.add_method(crate::BodyCompressionMethod::BUFFER);
             c.add_codec(batch_compression_type);
             c.finish()
         });
@@ -470,7 +466,7 @@ impl IpcDataGenerator {
         let nodes = fbb.create_vector(&nodes);
 
         let root = {
-            let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+            let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
             batch_builder.add_length(array_data.len() as i64);
             batch_builder.add_nodes(nodes);
             batch_builder.add_buffers(buffers);
@@ -481,16 +477,16 @@ impl IpcDataGenerator {
         };
 
         let root = {
-            let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb);
+            let mut batch_builder = crate::DictionaryBatchBuilder::new(&mut fbb);
             batch_builder.add_id(dict_id);
             batch_builder.add_data(root);
             batch_builder.finish().as_union_value()
         };
 
         let root = {
-            let mut message_builder = ipc::MessageBuilder::new(&mut fbb);
+            let mut message_builder = crate::MessageBuilder::new(&mut fbb);
             message_builder.add_version(write_options.metadata_version);
-            message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch);
+            message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
             message_builder.add_bodyLength(arrow_data.len() as i64);
             message_builder.add_header(root);
             message_builder.finish()
@@ -531,7 +527,11 @@ impl DictionaryTracker {
     /// * If the tracker has not been configured to error on replacement or this dictionary
     ///   has never been seen before, return `Ok(true)` to indicate that the dictionary was just
     ///   inserted.
-    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool> {
+    pub fn insert(
+        &mut self,
+        dict_id: i64,
+        column: &ArrayRef,
+    ) -> Result<bool, ArrowError> {
         let dict_data = column.data();
         let dict_values = &dict_data.child_data()[0];
 
@@ -565,9 +565,9 @@ pub struct FileWriter<W: Write> {
     /// The number of bytes between each block of bytes, as an offset for random access
     block_offsets: usize,
     /// Dictionary blocks that will be written as part of the IPC footer
-    dictionary_blocks: Vec<ipc::Block>,
+    dictionary_blocks: Vec<crate::Block>,
     /// Record blocks that will be written as part of the IPC footer
-    record_blocks: Vec<ipc::Block>,
+    record_blocks: Vec<crate::Block>,
     /// Whether the writer footer has been written, and the writer is finished
     finished: bool,
     /// Keeps track of dictionaries that have been written
@@ -578,7 +578,7 @@ pub struct FileWriter<W: Write> {
 
 impl<W: Write> FileWriter<W> {
     /// Try create a new writer, with the schema written as part of the header
-    pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
         let write_options = IpcWriteOptions::default();
         Self::try_new_with_options(writer, schema, write_options)
     }
@@ -588,7 +588,7 @@ impl<W: Write> FileWriter<W> {
         writer: W,
         schema: &Schema,
         write_options: IpcWriteOptions,
-    ) -> Result<Self> {
+    ) -> Result<Self, ArrowError> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header aligned on 8 byte boundary
@@ -613,7 +613,7 @@ impl<W: Write> FileWriter<W> {
     }
 
     /// Write a record batch to the file
-    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         if self.finished {
             return Err(ArrowError::IoError(
                 "Cannot write record batch to file writer as it is closed".to_string(),
@@ -631,7 +631,7 @@ impl<W: Write> FileWriter<W> {
                 write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
 
             let block =
-                ipc::Block::new(self.block_offsets as i64, meta as i32, data as i64);
+                crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
             self.dictionary_blocks.push(block);
             self.block_offsets += meta + data;
         }
@@ -639,7 +639,7 @@ impl<W: Write> FileWriter<W> {
         let (meta, data) =
             write_message(&mut self.writer, encoded_message, &self.write_options)?;
         // add a record block for the footer
-        let block = ipc::Block::new(
+        let block = crate::Block::new(
             self.block_offsets as i64,
             meta as i32, // TODO: is this still applicable?
             data as i64,
@@ -650,7 +650,7 @@ impl<W: Write> FileWriter<W> {
     }
 
     /// Write footer and closing tag, then mark the writer as done
-    pub fn finish(&mut self) -> Result<()> {
+    pub fn finish(&mut self) -> Result<(), ArrowError> {
         if self.finished {
             return Err(ArrowError::IoError(
                 "Cannot write footer to file writer as it is closed".to_string(),
@@ -663,10 +663,10 @@ impl<W: Write> FileWriter<W> {
         let mut fbb = FlatBufferBuilder::new();
         let dictionaries = fbb.create_vector(&self.dictionary_blocks);
         let record_batches = fbb.create_vector(&self.record_blocks);
-        let schema = ipc::convert::schema_to_fb_offset(&mut fbb, &self.schema);
+        let schema = crate::convert::schema_to_fb_offset(&mut fbb, &self.schema);
 
         let root = {
-            let mut footer_builder = ipc::FooterBuilder::new(&mut fbb);
+            let mut footer_builder = crate::FooterBuilder::new(&mut fbb);
             footer_builder.add_version(self.write_options.metadata_version);
             footer_builder.add_schema(schema);
             footer_builder.add_dictionaries(dictionaries);
@@ -690,7 +690,7 @@ impl<W: Write> FileWriter<W> {
     ///
     /// The buffer is flushed and the FileWriter is finished before returning the
     /// writer.
-    pub fn into_inner(mut self) -> Result<W> {
+    pub fn into_inner(mut self) -> Result<W, ArrowError> {
         if !self.finished {
             self.finish()?;
         }
@@ -713,7 +713,7 @@ pub struct StreamWriter<W: Write> {
 
 impl<W: Write> StreamWriter<W> {
     /// Try create a new writer, with the schema written as part of the header
-    pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
+    pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
         let write_options = IpcWriteOptions::default();
         Self::try_new_with_options(writer, schema, write_options)
     }
@@ -722,7 +722,7 @@ impl<W: Write> StreamWriter<W> {
         writer: W,
         schema: &Schema,
         write_options: IpcWriteOptions,
-    ) -> Result<Self> {
+    ) -> Result<Self, ArrowError> {
         let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write the schema, set the written bytes to the schema
@@ -738,7 +738,7 @@ impl<W: Write> StreamWriter<W> {
     }
 
     /// Write a record batch to the stream
-    pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
         if self.finished {
             return Err(ArrowError::IoError(
                 "Cannot write record batch to stream writer as it is closed".to_string(),
@@ -759,7 +759,7 @@ impl<W: Write> StreamWriter<W> {
     }
 
     /// Write continuation bytes, and mark the stream as done
-    pub fn finish(&mut self) -> Result<()> {
+    pub fn finish(&mut self) -> Result<(), ArrowError> {
         if self.finished {
             return Err(ArrowError::IoError(
                 "Cannot write footer to stream writer as it is closed".to_string(),
@@ -787,10 +787,9 @@ impl<W: Write> StreamWriter<W> {
     /// # Example
     ///
     /// ```
-    /// # use arrow::datatypes::Schema;
-    /// # use arrow::ipc::writer::{StreamWriter, IpcWriteOptions};
-    /// # use arrow::ipc::MetadataVersion;
-    /// # use arrow::error::ArrowError;
+    /// # use arrow_ipc::writer::{StreamWriter, IpcWriteOptions};
+    /// # use arrow_ipc::MetadataVersion;
+    /// # use arrow_schema::{ArrowError, Schema};
     /// # fn main() -> Result<(), ArrowError> {
     /// // The result we expect from an empty schema
     /// let expected = vec![
@@ -815,7 +814,7 @@ impl<W: Write> StreamWriter<W> {
     /// # Ok(())
     /// # }
     /// ```
-    pub fn into_inner(mut self) -> Result<W> {
+    pub fn into_inner(mut self) -> Result<W, ArrowError> {
         if !self.finished {
             self.finish()?;
         }
@@ -823,9 +822,9 @@ impl<W: Write> StreamWriter<W> {
     }
 }
 
-/// Stores the encoded data, which is an ipc::Message, and optional Arrow data
+/// Stores the encoded data, which is an crate::Message, and optional Arrow data
 pub struct EncodedData {
-    /// An encoded ipc::Message
+    /// An encoded crate::Message
     pub ipc_message: Vec<u8>,
     /// Arrow buffers to be written, should be an empty vec for schema messages
     pub arrow_data: Vec<u8>,
@@ -835,7 +834,7 @@ pub fn write_message<W: Write>(
     mut writer: W,
     encoded: EncodedData,
     write_options: &IpcWriteOptions,
-) -> Result<(usize, usize)> {
+) -> Result<(usize, usize), ArrowError> {
     let arrow_data_len = encoded.arrow_data.len();
     if arrow_data_len % 8 != 0 {
         return Err(ArrowError::MemoryError(
@@ -877,7 +876,7 @@ pub fn write_message<W: Write>(
     Ok((aligned_size, body_len))
 }
 
-fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize> {
+fn write_body_buffers<W: Write>(mut writer: W, data: &[u8]) -> Result<usize, ArrowError> {
     let len = data.len() as u32;
     let pad_len = pad_to_8(len) as u32;
     let total_len = len + pad_len;
@@ -898,17 +897,17 @@ fn write_continuation<W: Write>(
     mut writer: W,
     write_options: &IpcWriteOptions,
     total_len: i32,
-) -> Result<usize> {
+) -> Result<usize, ArrowError> {
     let mut written = 8;
 
     // the version of the writer determines whether continuation markers should be added
     match write_options.metadata_version {
-        ipc::MetadataVersion::V1
-        | ipc::MetadataVersion::V2
-        | ipc::MetadataVersion::V3 => {
+        crate::MetadataVersion::V1
+        | crate::MetadataVersion::V2
+        | crate::MetadataVersion::V3 => {
             unreachable!("Options with the metadata version cannot be created")
         }
-        ipc::MetadataVersion::V4 => {
+        crate::MetadataVersion::V4 => {
             if !write_options.write_legacy_ipc_format {
                 // v0.15.0 format
                 writer.write_all(&CONTINUATION_MARKER)?;
@@ -916,12 +915,12 @@ fn write_continuation<W: Write>(
             }
             writer.write_all(&total_len.to_le_bytes()[..])?;
         }
-        ipc::MetadataVersion::V5 => {
+        crate::MetadataVersion::V5 => {
             // write continuation marker and message length
             writer.write_all(&CONTINUATION_MARKER)?;
             writer.write_all(&total_len.to_le_bytes()[..])?;
         }
-        z => panic!("Unsupported ipc::MetadataVersion {:?}", z),
+        z => panic!("Unsupported crate::MetadataVersion {:?}", z),
     };
 
     writer.flush()?;
@@ -932,7 +931,7 @@ fn write_continuation<W: Write>(
 /// In V4, null types have no validity bitmap
 /// In V5 and later, null and union types have no validity bitmap
 fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
-    if write_options.metadata_version < ipc::MetadataVersion::V5 {
+    if write_options.metadata_version < crate::MetadataVersion::V5 {
         !matches!(data_type, DataType::Null)
     } else {
         !matches!(data_type, DataType::Null | DataType::Union(_, _, _))
@@ -1053,22 +1052,22 @@ fn get_buffer_offset<OffsetSize: OffsetSizeTrait>(array_data: &ArrayData) -> Off
 #[allow(clippy::too_many_arguments)]
 fn write_array_data(
     array_data: &ArrayData,
-    buffers: &mut Vec<ipc::Buffer>,
+    buffers: &mut Vec<crate::Buffer>,
     arrow_data: &mut Vec<u8>,
-    nodes: &mut Vec<ipc::FieldNode>,
+    nodes: &mut Vec<crate::FieldNode>,
     offset: i64,
     num_rows: usize,
     null_count: usize,
     compression_codec: &Option<CompressionCodec>,
     write_options: &IpcWriteOptions,
-) -> Result<i64> {
+) -> Result<i64, ArrowError> {
     let mut offset = offset;
     if !matches!(array_data.data_type(), DataType::Null) {
-        nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64));
+        nodes.push(crate::FieldNode::new(num_rows as i64, null_count as i64));
     } else {
         // NullArray's null_count equals to len, but the `null_count` passed in is from ArrayData
         // where null_count is always 0.
-        nodes.push(ipc::FieldNode::new(num_rows as i64, num_rows as i64));
+        nodes.push(crate::FieldNode::new(num_rows as i64, num_rows as i64));
     }
     if has_validity_bitmap(array_data.data_type(), write_options) {
         // write null buffer if exists
@@ -1219,7 +1218,7 @@ fn write_array_data(
 }
 
 /// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`ipc::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
+/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
 ///
 ///
 /// From <https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
@@ -1231,12 +1230,12 @@ fn write_array_data(
 /// follows is not compressed, which can be useful for cases where
 /// compression does not yield appreciable savings.
 fn write_buffer(
-    buffer: &[u8],                  // input
-    buffers: &mut Vec<ipc::Buffer>, // output buffer descriptors
-    arrow_data: &mut Vec<u8>,       // output stream
-    offset: i64,                    // current output stream offset
+    buffer: &[u8],                    // input
+    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
+    arrow_data: &mut Vec<u8>,         // output stream
+    offset: i64,                      // current output stream offset
     compression_codec: &Option<CompressionCodec>,
-) -> Result<i64> {
+) -> Result<i64, ArrowError> {
     let len: i64 = match compression_codec {
         Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?,
         None => {
@@ -1253,7 +1252,7 @@ fn write_buffer(
     })?;
 
     // make new index entry
-    buffers.push(ipc::Buffer::new(offset, len));
+    buffers.push(crate::Buffer::new(offset, len));
     // padding and make offset 8 bytes aligned
     let pad_len = pad_to_8(len as u32) as i64;
     arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]);
@@ -1271,18 +1270,18 @@ fn pad_to_8(len: u32) -> usize {
 mod tests {
     use super::*;
 
-    use std::fs::File;
     use std::io::Seek;
     use std::sync::Arc;
 
-    use ipc::MetadataVersion;
+    use crate::MetadataVersion;
 
-    use crate::array::*;
-    use crate::datatypes::Field;
-    use crate::ipc::reader::*;
+    use crate::reader::*;
+    use arrow_array::builder::UnionBuilder;
+    use arrow_array::types::*;
+    use arrow_schema::DataType;
 
     #[test]
-    #[cfg(feature = "ipc_compression")]
+    #[cfg(feature = "lz4")]
     fn test_write_empty_record_batch_lz4_compression() {
         let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
         let values: Vec<Option<i32>> = vec![];
@@ -1295,9 +1294,9 @@ mod tests {
 
         {
             let write_option =
-                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
                     .unwrap()
-                    .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+                    .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
                     .unwrap();
 
             let mut writer =
@@ -1335,7 +1334,7 @@ mod tests {
     }
 
     #[test]
-    #[cfg(feature = "ipc_compression")]
+    #[cfg(feature = "lz4")]
     fn test_write_file_with_lz4_compression() {
         let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
         let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
@@ -1347,9 +1346,9 @@ mod tests {
         let mut file = tempfile::tempfile().unwrap();
         {
             let write_option =
-                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
                     .unwrap()
-                    .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+                    .try_with_compression(Some(crate::CompressionType::LZ4_FRAME))
                     .unwrap();
 
             let mut writer =
@@ -1387,7 +1386,7 @@ mod tests {
     }
 
     #[test]
-    #[cfg(feature = "ipc_compression")]
+    #[cfg(feature = "zstd")]
     fn test_write_file_with_zstd_compression() {
         let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]);
         let values: Vec<Option<i32>> = vec![Some(12), Some(1)];
@@ -1398,9 +1397,9 @@ mod tests {
         let mut file = tempfile::tempfile().unwrap();
         {
             let write_option =
-                IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
+                IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5)
                     .unwrap()
-                    .try_with_compression(Some(ipc::CompressionType::ZSTD))
+                    .try_with_compression(Some(crate::CompressionType::ZSTD))
                     .unwrap();
 
             let mut writer =
@@ -1482,7 +1481,7 @@ mod tests {
         }
     }
 
-    fn write_null_file(options: IpcWriteOptions, suffix: &str) {
+    fn write_null_file(options: IpcWriteOptions) {
         let schema = Schema::new(vec![
             Field::new("nulls", DataType::Null, true),
             Field::new("int32s", DataType::Int32, false),
@@ -1503,18 +1502,18 @@ mod tests {
             ],
         )
         .unwrap();
-        let file_name = format!("target/debug/testdata/nulls_{}.arrow_file", suffix);
+        let mut file = tempfile::tempfile().unwrap();
         {
-            let file = File::create(&file_name).unwrap();
             let mut writer =
-                FileWriter::try_new_with_options(file, &schema, options).unwrap();
+                FileWriter::try_new_with_options(&mut file, &schema, options).unwrap();
 
             writer.write(&batch).unwrap();
             writer.finish().unwrap();
         }
 
+        file.rewind().unwrap();
+
         {
-            let file = File::open(&file_name).unwrap();
             let reader = FileReader::try_new(file, None).unwrap();
             reader.for_each(|maybe_batch| {
                 maybe_batch
@@ -1532,33 +1531,19 @@ mod tests {
     }
     #[test]
     fn test_write_null_file_v4() {
-        write_null_file(
-            IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(),
-            "v4_a8",
-        );
-        write_null_file(
-            IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(),
-            "v4_a8l",
-        );
+        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap());
+        write_null_file(IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap());
         write_null_file(
             IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap(),
-            "v4_a64",
-        );
-        write_null_file(
-            IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap(),
-            "v4_a64l",
         );
+        write_null_file(IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap());
     }
 
     #[test]
     fn test_write_null_file_v5() {
-        write_null_file(
-            IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(),
-            "v5_a8",
-        );
+        write_null_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap());
         write_null_file(
             IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap(),
-            "v5_a64",
         );
     }
 
@@ -1626,45 +1611,6 @@ mod tests {
         assert!(dict_tracker.written.contains_key(&2));
     }
 
-    #[test]
-    fn read_union_017() {
-        let testdata = crate::util::test_util::arrow_test_data();
-        let data_file = File::open(format!(
-            "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
-            testdata,
-        ))
-        .unwrap();
-
-        let reader = StreamReader::try_new(data_file, None).unwrap();
-
-        let mut file = tempfile::tempfile().unwrap();
-        // read and rewrite the stream to a temp location
-        {
-            let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
-            reader.for_each(|batch| {
-                writer.write(&batch.unwrap()).unwrap();
-            });
-            writer.finish().unwrap();
-        }
-        file.rewind().unwrap();
-
-        // Compare original file and rewrote file
-        let rewrite_reader = StreamReader::try_new(file, None).unwrap();
-
-        let data_file = File::open(format!(
-            "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
-            testdata,
-        ))
-        .unwrap();
-        let reader = StreamReader::try_new(data_file, None).unwrap();
-
-        reader.into_iter().zip(rewrite_reader.into_iter()).for_each(
-            |(batch1, batch2)| {
-                assert_eq!(batch1.unwrap(), batch2.unwrap());
-            },
-        );
-    }
-
     fn write_union_file(options: IpcWriteOptions) {
         let schema = Schema::new(vec![Field::new(
             "union",
@@ -1739,7 +1685,7 @@ mod tests {
 
     fn deserialize(bytes: Vec<u8>) -> RecordBatch {
         let mut stream_reader =
-            ipc::reader::StreamReader::try_new(std::io::Cursor::new(bytes), None)
+            crate::reader::StreamReader::try_new(std::io::Cursor::new(bytes), None)
                 .unwrap();
         stream_reader.next().unwrap().unwrap()
     }
diff --git a/arrow/CONTRIBUTING.md b/arrow/CONTRIBUTING.md
index bbf309d4d..5b84bc2d3 100644
--- a/arrow/CONTRIBUTING.md
+++ b/arrow/CONTRIBUTING.md
@@ -26,23 +26,6 @@ Rust [README.md](../README.md).
 Please refer to [lib.rs](src/lib.rs) for an introduction to this
 specific crate and its current functionality.
 
-## IPC
-
-The expected flatc version is 1.12.0+, built from [flatbuffers](https://github.com/google/flatbuffers)
-master at fixed commit ID, by regen.sh.
-
-The IPC flatbuffer code was generated by running this command from the root of the project:
-
-```bash
-./regen.sh
-```
-
-The above script will run the `flatc` compiler and perform some adjustments to the source code:
-
-- Replace `type__` with `type_`
-- Remove `org::apache::arrow::flatbuffers` namespace
-- Add includes to each generated file
-
 ## Guidelines in usage of `unsafe`
 
 [`unsafe`](https://doc.rust-lang.org/book/ch19-01-unsafe-rust.html) has a high maintenance cost because debugging and testing it is difficult, time consuming, often requires external tools (e.g. `valgrind`), and requires a higher-than-usual attention to details. Undefined behavior is particularly difficult to identify and test, and usage of `unsafe` is the [primary cause of undefined behavior](https://doc.rust-lang.org/reference/behavior-considered-undefined.html) in a program written in Rust.
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 5749f6799..6c30df6bd 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -50,6 +50,7 @@ arrow-data = { version = "26.0.0", path = "../arrow-data" }
 arrow-schema = { version = "26.0.0", path = "../arrow-schema" }
 arrow-array = { version = "26.0.0", path = "../arrow-array" }
 arrow-select = { version = "26.0.0", path = "../arrow-select" }
+arrow-ipc = { version = "26.0.0", path = "../arrow-ipc", optional = true }
 serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
 indexmap = { version = "1.9", default-features = false, features = ["std"] }
 rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
@@ -60,25 +61,22 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag
 regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
 regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
 lazy_static = { version = "1.4", default-features = false }
-lz4 = { version = "1.23", default-features = false, optional = true }
 packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
 chrono = { version = "0.4", default-features = false, features = ["clock"] }
-flatbuffers = { version = "22.9.2", default-features = false, features = ["thiserror"], optional = true }
 comfy-table = { version = "6.0", optional = true, default-features = false }
 pyo3 = { version = "0.17", default-features = false, optional = true }
 lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
 multiversion = { version = "0.6.1", default-features = false }
 bitflags = { version = "1.2.1", default-features = false }
-zstd = { version = "0.11.1", default-features = false, optional = true }
 
 [package.metadata.docs.rs]
 features = ["prettyprint", "ipc_compression", "dyn_cmp_dict", "ffi", "pyarrow"]
 
 [features]
 default = ["csv", "ipc", "json"]
-ipc_compression = ["ipc", "zstd", "lz4"]
+ipc_compression = ["ipc", "arrow-ipc/lz4", "arrow-ipc/zstd"]
 csv = ["csv_crate"]
-ipc = ["flatbuffers"]
+ipc = ["arrow-ipc"]
 json = ["serde_json"]
 simd = ["packed_simd"]
 prettyprint = ["comfy-table"]
@@ -265,3 +263,7 @@ required-features = ["test_utils"]
 name = "lexsort"
 harness = false
 required-features = ["test_utils"]
+
+[[test]]
+name = "ipc_integration"
+required-features = ["test_utils", "ipc"]
diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs
deleted file mode 100644
index 666fa6d86..000000000
--- a/arrow/src/ipc/compression/mod.rs
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(feature = "ipc_compression")]
-mod codec;
-#[cfg(feature = "ipc_compression")]
-pub(crate) use codec::CompressionCodec;
-
-#[cfg(not(feature = "ipc_compression"))]
-mod stub;
-#[cfg(not(feature = "ipc_compression"))]
-pub(crate) use stub::CompressionCodec;
diff --git a/arrow/src/ipc/compression/stub.rs b/arrow/src/ipc/compression/stub.rs
deleted file mode 100644
index 6240f084b..000000000
--- a/arrow/src/ipc/compression/stub.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Stubs that implement the same interface as the ipc_compression
-//! codec module, but always errors.
-
-use crate::buffer::Buffer;
-use crate::error::{ArrowError, Result};
-use crate::ipc::CompressionType;
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum CompressionCodec {}
-
-impl TryFrom<CompressionCodec> for CompressionType {
-    type Error = ArrowError;
-    fn try_from(codec: CompressionCodec) -> Result<Self> {
-        Err(ArrowError::InvalidArgumentError(
-            format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)))
-    }
-}
-
-impl TryFrom<CompressionType> for CompressionCodec {
-    type Error = ArrowError;
-
-    fn try_from(compression_type: CompressionType) -> Result<Self> {
-        Err(ArrowError::InvalidArgumentError(
-            format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
-            )
-    }
-}
-
-impl CompressionCodec {
-    #[allow(clippy::ptr_arg)]
-    pub(crate) fn compress_to_vec(
-        &self,
-        _input: &[u8],
-        _output: &mut Vec<u8>,
-    ) -> Result<usize> {
-        Err(ArrowError::InvalidArgumentError(
-            "compression not supported because arrow was not compiled with the ipc_compression feature".to_string()
-        ))
-    }
-
-    pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result<Buffer> {
-        Err(ArrowError::InvalidArgumentError(
-            "decompression not supported because arrow was not compiled with the ipc_compression feature".to_string()
-        ))
-    }
-}
diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs
index 0081856f3..b2fa30d26 100644
--- a/arrow/src/lib.rs
+++ b/arrow/src/lib.rs
@@ -314,35 +314,14 @@ pub mod ffi;
 #[cfg(feature = "ffi")]
 pub mod ffi_stream;
 #[cfg(feature = "ipc")]
-pub mod ipc;
+pub use arrow_ipc as ipc;
 #[cfg(feature = "serde_json")]
 pub mod json;
 #[cfg(feature = "pyarrow")]
 pub mod pyarrow;
 
 pub mod record_batch {
-    pub use arrow_array::{RecordBatch, RecordBatchOptions};
-    use arrow_schema::{ArrowError, SchemaRef};
-
-    /// Trait for types that can read `RecordBatch`'s.
-    pub trait RecordBatchReader:
-        Iterator<Item = Result<RecordBatch, ArrowError>>
-    {
-        /// Returns the schema of this `RecordBatchReader`.
-        ///
-        /// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
-        /// reader should have the same schema as returned from this method.
-        fn schema(&self) -> SchemaRef;
-
-        /// Reads the next `RecordBatch`.
-        #[deprecated(
-            since = "2.0.0",
-            note = "This method is deprecated in favour of `next` from the trait Iterator."
-        )]
-        fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-            self.next().transpose()
-        }
-    }
+    pub use arrow_array::{RecordBatch, RecordBatchOptions, RecordBatchReader};
 }
 pub mod row;
 pub use arrow_array::temporal_conversions;
diff --git a/arrow/tests/ipc_integration.rs b/arrow/tests/ipc_integration.rs
new file mode 100644
index 000000000..abaa238ba
--- /dev/null
+++ b/arrow/tests/ipc_integration.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_ipc::reader::StreamReader;
+use arrow_ipc::writer::StreamWriter;
+use std::fs::File;
+use std::io::Seek;
+
+#[test]
+fn read_union_017() {
+    let testdata = arrow::util::test_util::arrow_test_data();
+    let data_file = File::open(format!(
+        "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
+        testdata,
+    ))
+    .unwrap();
+
+    let reader = StreamReader::try_new(data_file, None).unwrap();
+
+    let mut file = tempfile::tempfile().unwrap();
+    // read and rewrite the stream to a temp location
+    {
+        let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap();
+        reader.for_each(|batch| {
+            writer.write(&batch.unwrap()).unwrap();
+        });
+        writer.finish().unwrap();
+    }
+    file.rewind().unwrap();
+
+    // Compare original file and rewrote file
+    let rewrite_reader = StreamReader::try_new(file, None).unwrap();
+
+    let data_file = File::open(format!(
+        "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
+        testdata,
+    ))
+    .unwrap();
+    let reader = StreamReader::try_new(data_file, None).unwrap();
+
+    reader
+        .into_iter()
+        .zip(rewrite_reader.into_iter())
+        .for_each(|(batch1, batch2)| {
+            assert_eq!(batch1.unwrap(), batch2.unwrap());
+        });
+}
diff --git a/dev/release/README.md b/dev/release/README.md
index a12e07f8e..093e1c4c2 100644
--- a/dev/release/README.md
+++ b/dev/release/README.md
@@ -257,6 +257,8 @@ Rust Arrow Crates:
 (cd arrow-data && cargo publish)
 (cd arrow-array && cargo publish)
 (cd arrow-select && cargo publish)
+(cd arrow-cast && cargo publish)
+(cd arrow-ipc && cargo publish)
 (cd arrow && cargo publish)
 (cd arrow-flight && cargo publish)
 (cd parquet && cargo publish)