You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/11/22 18:47:02 UTC

[arrow] branch master updated: ARROW-1047: [Java] Add Generic Reader Interface for Stream Format

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1516306  ARROW-1047: [Java] Add Generic Reader Interface for Stream Format
1516306 is described below

commit 1516306d40e4b785416282fe84daa9db342e5b90
Author: Bryan Cutler <cu...@gmail.com>
AuthorDate: Wed Nov 22 13:46:56 2017 -0500

    ARROW-1047: [Java] Add Generic Reader Interface for Stream Format
    
    This change decouples the reading of messages from the ReadChannel so that it is possible to build a reader that is not tied to a specific stream.  This adds a new interface `MessageReader` that will return a `Message` and the message body.  The `MessageChannelReader` implements this interface to read from a `ReadChannel` to match the current functionality.  A re-org of reading and writing packages is also done to better organize classes under an `ipc` package.
    
    There is a slight change in behavior for the `ArrowFileReader` that should not affect any usage.  Previously,  the schema was read during initialization, and all dictionaries read just before the first record batch was read.  Now, all dictionaries are read directly after the schema, and not specifically tied to reading the first record batch.
    
    Author: Bryan Cutler <cu...@gmail.com>
    
    Closes #1259 from BryanCutler/java-generic-stream-interfaces-ARROW-1047 and squashes the following commits:
    
    43314ca1 [Bryan Cutler] ARROW-1047: [Java] Add Generic Reader Interface for Stream Format
---
 .../java/org/apache/arrow/tools/EchoServer.java    |   4 +-
 .../java/org/apache/arrow/tools/FileRoundtrip.java |   4 +-
 .../java/org/apache/arrow/tools/FileToStream.java  |   4 +-
 .../java/org/apache/arrow/tools/Integration.java   |  10 +-
 .../java/org/apache/arrow/tools/StreamToFile.java  |   4 +-
 .../apache/arrow/tools/ArrowFileTestFixtures.java  |   6 +-
 .../org/apache/arrow/tools/EchoServerTest.java     |   4 +-
 .../src/main/codegen/templates/UnionVector.java    |   2 +-
 .../apache/arrow/vector/BaseDataValueVector.java   |   2 +-
 .../arrow/vector/BaseNullableFixedWidthVector.java |   2 +-
 .../vector/BaseNullableVariableWidthVector.java    |   2 +-
 .../java/org/apache/arrow/vector/BitVector.java    |   2 +-
 .../org/apache/arrow/vector/BitVectorHelper.java   |   2 +-
 .../java/org/apache/arrow/vector/BufferBacked.java |   2 +-
 .../java/org/apache/arrow/vector/FieldVector.java  |   2 +-
 .../java/org/apache/arrow/vector/VectorLoader.java |   6 +-
 .../org/apache/arrow/vector/VectorUnloader.java    |   6 +-
 .../java/org/apache/arrow/vector/ZeroVector.java   |   2 +-
 .../arrow/vector/complex/FixedSizeListVector.java  |   2 +-
 .../apache/arrow/vector/complex/ListVector.java    |   2 +-
 .../arrow/vector/complex/NullableMapVector.java    |   2 +-
 .../vector/{file => ipc}/ArrowFileReader.java      |  59 +++++---
 .../vector/{file => ipc}/ArrowFileWriter.java      |   4 +-
 .../arrow/vector/{file => ipc}/ArrowMagic.java     |   4 +-
 .../arrow/vector/{file => ipc}/ArrowReader.java    | 150 ++++++++++++---------
 .../apache/arrow/vector/ipc/ArrowStreamReader.java | 148 ++++++++++++++++++++
 .../vector/{stream => ipc}/ArrowStreamWriter.java  |  11 +-
 .../arrow/vector/{file => ipc}/ArrowWriter.java    |   9 +-
 .../{file => ipc}/InvalidArrowFileException.java   |   2 +-
 .../vector/{file/json => ipc}/JsonFileReader.java  |  10 +-
 .../vector/{file/json => ipc}/JsonFileWriter.java  |   6 +-
 .../arrow/vector/{file => ipc}/ReadChannel.java    |   2 +-
 .../vector/{file => ipc}/SeekableReadChannel.java  |   2 +-
 .../arrow/vector/{file => ipc}/WriteChannel.java   |   4 +-
 .../vector/{file => ipc/message}/ArrowBlock.java   |   3 +-
 .../{schema => ipc/message}/ArrowBuffer.java       |   2 +-
 .../message}/ArrowDictionaryBatch.java             |   2 +-
 .../{schema => ipc/message}/ArrowFieldNode.java    |   2 +-
 .../vector/{file => ipc/message}/ArrowFooter.java  |   5 +-
 .../{schema => ipc/message}/ArrowMessage.java      |   2 +-
 .../{schema => ipc/message}/ArrowRecordBatch.java  |   8 +-
 .../{schema => ipc/message}/ArrowVectorType.java   |   2 +-
 .../{schema => ipc/message}/FBSerializable.java    |   2 +-
 .../{schema => ipc/message}/FBSerializables.java   |   2 +-
 .../vector/ipc/message/MessageChannelReader.java   | 115 ++++++++++++++++
 .../arrow/vector/ipc/message/MessageReader.java    |  65 +++++++++
 .../{stream => ipc/message}/MessageSerializer.java |  91 ++++++++-----
 .../vector/{schema => ipc/message}/TypeLayout.java |  46 +++----
 .../{schema => ipc/message}/VectorLayout.java      |  23 ++--
 .../arrow/vector/stream/ArrowStreamReader.java     |  66 ---------
 .../org/apache/arrow/vector/types/pojo/Field.java  |   8 +-
 .../org/apache/arrow/vector/TestValueVector.java   |   6 +-
 .../apache/arrow/vector/TestVectorUnloadLoad.java  |   4 +-
 .../arrow/vector/{file => ipc}/BaseFileTest.java   |   2 +-
 .../{stream => ipc}/MessageSerializerTest.java     |  13 +-
 .../arrow/vector/{file => ipc}/TestArrowFile.java  |  61 ++++-----
 .../vector/{file => ipc}/TestArrowFooter.java      |   4 +-
 .../{file => ipc}/TestArrowReaderWriter.java       |  21 ++-
 .../vector/{file => ipc}/TestArrowStream.java      |  15 +--
 .../vector/{file => ipc}/TestArrowStreamPipe.java  |  50 +++----
 .../vector/{file/json => ipc}/TestJSONFile.java    |   3 +-
 61 files changed, 698 insertions(+), 408 deletions(-)

diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
index 3091bc4..ce6b516 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java
@@ -23,8 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
index ab8fa6e..6e45305 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java
@@ -22,8 +22,8 @@ package org.apache.arrow.tools;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
index 6722b30..3db01f4 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java
@@ -21,8 +21,8 @@ package org.apache.arrow.tools;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 
 import java.io.File;
 import java.io.FileInputStream;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
index d2b35e6..666f1dd 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/Integration.java
@@ -22,11 +22,11 @@ package org.apache.arrow.tools;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
-import org.apache.arrow.vector.file.json.JsonFileReader;
-import org.apache.arrow.vector.file.json.JsonFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.JsonFileReader;
+import org.apache.arrow.vector.ipc.JsonFileWriter;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
index ef1a11f..42d336a 100644
--- a/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
+++ b/java/tools/src/main/java/org/apache/arrow/tools/StreamToFile.java
@@ -21,8 +21,8 @@ package org.apache.arrow.tools;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.file.ArrowFileWriter;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
 
 import java.io.File;
 import java.io.FileInputStream;
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
index c56a5a3..eac517d 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/ArrowFileTestFixtures.java
@@ -28,9 +28,9 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowFileReader;
-import org.apache.arrow.vector.file.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 89714e4..d8693c5 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -44,8 +44,8 @@ import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java
index e44edbd..7316531 100644
--- a/java/vector/src/main/codegen/templates/UnionVector.java
+++ b/java/vector/src/main/codegen/templates/UnionVector.java
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import org.apache.arrow.vector.BaseDataValueVector;
 import org.apache.arrow.vector.complex.impl.ComplexCopier;
 import org.apache.arrow.vector.util.CallBack;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.vector.BaseValueVector;
 import org.apache.arrow.vector.util.OversizedAllocationException;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
index 38524ff..6d9eb1d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseDataValueVector.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.util.CallBack;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
index 209758e..f82077f 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableFixedWidthVector.java
@@ -28,7 +28,7 @@ import java.util.List;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.CallBack;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
index edf4987..b9e5442 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseNullableVariableWidthVector.java
@@ -25,7 +25,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.memory.BaseAllocator;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.complex.NullableMapVector;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.CallBack;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
index c6d404e..26c8170 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java
@@ -24,7 +24,7 @@ import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.holders.BitHolder;
 import org.apache.arrow.vector.holders.NullableBitHolder;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.OversizedAllocationException;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
index 23252ca..2d4db85 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVectorHelper.java
@@ -20,7 +20,7 @@ package org.apache.arrow.vector;
 
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 /**
  * Helper class for performing generic operations on a bit vector buffer.
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
index a0dbf2b..332ca22 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/BufferBacked.java
@@ -18,7 +18,7 @@
 
 package org.apache.arrow.vector;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 
 import io.netty.buffer.ArrowBuf;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
index c2ed17e..509eeda 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java
@@ -20,7 +20,7 @@ package org.apache.arrow.vector;
 
 import java.util.List;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.Field;
 
 import io.netty.buffer.ArrowBuf;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index 58fc80b..2cd4099 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.VectorLayout;
 import org.apache.arrow.vector.types.pojo.Field;
 
 import com.google.common.collect.Iterators;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index fd96773..2b03489 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 
 public class VectorUnloader {
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
index 3cc93a2..0ab3a7b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ZeroVector.java
@@ -28,7 +28,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.complex.impl.NullReader;
 import org.apache.arrow.vector.complex.reader.FieldReader;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Null;
 import org.apache.arrow.vector.types.pojo.Field;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
index 6713b1c..774a10d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java
@@ -33,7 +33,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.OutOfMemoryException;
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.impl.UnionFixedSizeListReader;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
index afe86a6..d50d4c4 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
@@ -42,7 +42,7 @@ import org.apache.arrow.vector.complex.impl.UnionListReader;
 import org.apache.arrow.vector.complex.impl.UnionListWriter;
 import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.complex.writer.FieldWriter;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
index f95302f..e223d1c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java
@@ -34,7 +34,7 @@ import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl;
 import org.apache.arrow.vector.complex.impl.NullableMapWriter;
 import org.apache.arrow.vector.holders.ComplexHolder;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.ArrowType.Struct;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
similarity index 77%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
index d711b9c..4cd7026 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -26,32 +26,45 @@ import java.util.List;
 
 import org.apache.arrow.flatbuf.Footer;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ArrowFileReader extends ArrowReader<SeekableReadChannel> {
+public class ArrowFileReader extends ArrowReader {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFileReader.class);
 
+  private SeekableReadChannel in;
   private ArrowFooter footer;
   private int currentDictionaryBatch = 0;
   private int currentRecordBatch = 0;
 
+  public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
+    super(allocator);
+    this.in = in;
+  }
+
   public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
-    super(new SeekableReadChannel(in), allocator);
+    this(new SeekableReadChannel(in), allocator);
   }
 
-  public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
-    super(in, allocator);
+  @Override
+  public long bytesRead() {
+    return in.bytesRead();
+  }
+
+  @Override
+  protected void closeReadSource() throws IOException {
+    in.close();
   }
 
   @Override
-  protected Schema readSchema(SeekableReadChannel in) throws IOException {
+  protected Schema readSchema() throws IOException {
     if (footer == null) {
       if (in.size() <= (ArrowMagic.MAGIC_LENGTH * 2 + 4)) {
         throw new InvalidArrowFileException("file too small: " + in.size());
@@ -82,18 +95,30 @@ public class ArrowFileReader extends ArrowReader<SeekableReadChannel> {
   }
 
   @Override
-  protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException {
-    if (currentDictionaryBatch < footer.getDictionaries().size()) {
-      ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++);
-      return readDictionaryBatch(in, block, allocator);
-    } else if (currentRecordBatch < footer.getRecordBatches().size()) {
+  public ArrowDictionaryBatch readDictionary() throws IOException {
+    if (currentDictionaryBatch >= footer.getDictionaries().size()) {
+      throw new IOException("Requested more dictionaries than defined in footer: " + currentDictionaryBatch);
+    }
+    ArrowBlock block = footer.getDictionaries().get(currentDictionaryBatch++);
+    return readDictionaryBatch(in, block, allocator);
+  }
+
+  // Returns true if a batch was read, false if no more batches
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    prepareLoadNextBatch();
+
+    if (currentRecordBatch < footer.getRecordBatches().size()) {
       ArrowBlock block = footer.getRecordBatches().get(currentRecordBatch++);
-      return readRecordBatch(in, block, allocator);
+      ArrowRecordBatch batch = readRecordBatch(in, block, allocator);
+      loadRecordBatch(batch);
+      return true;
     } else {
-      return null;
+      return false;
     }
   }
 
+
   public List<ArrowBlock> getDictionaryBlocks() throws IOException {
     ensureInitialized();
     return footer.getDictionaries();
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
similarity index 94%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 1d92d2b..1b687c9 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
similarity index 93%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
index 68313e7..a9310a6 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowMagic.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowMagic.java
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
+
+import org.apache.arrow.vector.ipc.WriteChannel;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
similarity index 65%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
index 21fb220..6d708a0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,32 +33,25 @@ import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowMessage.ArrowMessageVisitor;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.DictionaryUtility;
 
 /**
- * Abstract class to read ArrowRecordBatches from a ReadChannel.
+ * Abstract class to read Schema and ArrowRecordBatches.
  *
- * @param <T> Type of ReadChannel to use
  */
-public abstract class ArrowReader<T extends ReadChannel> implements DictionaryProvider, AutoCloseable {
-
-  private final T in;
-  private final BufferAllocator allocator;
+public abstract class ArrowReader implements DictionaryProvider, AutoCloseable {
 
+  protected final BufferAllocator allocator;
   private VectorLoader loader;
   private VectorSchemaRoot root;
   private Map<Long, Dictionary> dictionaries;
-
   private boolean initialized = false;
 
-  protected ArrowReader(T in, BufferAllocator allocator) {
-    this.in = in;
+  protected ArrowReader(BufferAllocator allocator) {
     this.allocator = allocator;
   }
 
@@ -105,58 +98,18 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
    * @return true if a batch was read, false on EOS
    * @throws IOException
    */
-  public boolean loadNextBatch() throws IOException {
-    ensureInitialized();
-    // read in all dictionary batches, then stop after our first record batch
-    ArrowMessageVisitor<Boolean> visitor = new ArrowMessageVisitor<Boolean>() {
-      @Override
-      public Boolean visit(ArrowDictionaryBatch message) {
-        try {
-          load(message);
-        } finally {
-          message.close();
-        }
-        return true;
-      }
-
-      @Override
-      public Boolean visit(ArrowRecordBatch message) {
-        try {
-          loader.load(message);
-        } finally {
-          message.close();
-        }
-        return false;
-      }
-    };
-    root.setRowCount(0);
-    ArrowMessage message = readMessage(in, allocator);
-
-    boolean readBatch = false;
-    while (message != null) {
-      if (!message.accepts(visitor)) {
-        readBatch = true;
-        break;
-      }
-      // else read a dictionary
-      message = readMessage(in, allocator);
-    }
-
-    return readBatch;
-  }
+  public abstract boolean loadNextBatch() throws IOException;
 
   /**
    * Return the number of bytes read from the ReadChannel.
    *
    * @return number of bytes read
    */
-  public long bytesRead() {
-    return in.bytesRead();
-  }
+  public abstract long bytesRead();
 
   /**
    * Close resources, including vector schema root and dictionary vectors, and the
-   * underlying ReadChannel.
+   * underlying read source.
    *
    * @throws IOException
    */
@@ -167,12 +120,12 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
 
   /**
    * Close resources, including vector schema root and dictionary vectors. If the flag
-   * closeReadChannel is true then close the underlying ReadChannel, otherwise leave it open.
+   * closeReadChannel is true then close the underlying read source, otherwise leave it open.
    *
-   * @param closeReadChannel Flag to control if closing the underlying ReadChannel
+   * @param closeReadSource Flag to control if closing the underlying read source
    * @throws IOException
    */
-  public void close(boolean closeReadChannel) throws IOException {
+  public void close(boolean closeReadSource) throws IOException {
     if (initialized) {
       root.close();
       for (Dictionary dictionary : dictionaries.values()) {
@@ -180,15 +133,40 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
       }
     }
 
-    if (closeReadChannel) {
-      in.close();
+    if (closeReadSource) {
+      closeReadSource();
     }
   }
 
-  protected abstract Schema readSchema(T in) throws IOException;
+  /**
+   * Close the underlying read source.
+   *
+   * @throws IOException
+   */
+  protected abstract void closeReadSource() throws IOException;
+
+  /**
+   * Read the Schema from the source, will be invoked at the beginning the initialization.
+   *
+   * @return the read Schema
+   * @throws IOException
+   */
+  protected abstract Schema readSchema() throws IOException;
 
-  protected abstract ArrowMessage readMessage(T in, BufferAllocator allocator) throws IOException;
+  /**
+   * Read a dictionary batch from the source, will be invoked after the schema has been read and
+   * called N times, where N is the number of dictionaries indicated by the schema Fields.
+   *
+   * @return the read ArrowDictionaryBatch
+   * @throws IOException
+   */
+  protected abstract ArrowDictionaryBatch readDictionary() throws IOException;
 
+  /**
+   * Initialize if not done previously.
+   *
+   * @throws IOException
+   */
   protected void ensureInitialized() throws IOException {
     if (!initialized) {
       initialize();
@@ -200,7 +178,7 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
    * Reads the schema and initializes the vectors
    */
   private void initialize() throws IOException {
-    Schema originalSchema = readSchema(in);
+    Schema originalSchema = readSchema();
     List<Field> fields = new ArrayList<>();
     List<FieldVector> vectors = new ArrayList<>();
     Map<Long, Dictionary> dictionaries = new HashMap<>();
@@ -216,9 +194,43 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
     this.root = new VectorSchemaRoot(schema, vectors, 0);
     this.loader = new VectorLoader(root);
     this.dictionaries = Collections.unmodifiableMap(dictionaries);
+
+    // Read and load all dictionaries from schema
+    for (int i = 0; i < dictionaries.size(); i++) {
+      ArrowDictionaryBatch dictionaryBatch = readDictionary();
+      loadDictionary(dictionaryBatch);
+    }
   }
 
-  private void load(ArrowDictionaryBatch dictionaryBatch) {
+  /**
+   * Ensure the reader has been initialized and reset the VectorSchemaRoot row count to 0.
+   *
+   * @throws IOException
+   */
+  protected void prepareLoadNextBatch() throws IOException {
+    ensureInitialized();
+    root.setRowCount(0);
+  }
+
+  /**
+   * Load an ArrowRecordBatch to the readers VectorSchemaRoot.
+   *
+   * @param batch the record batch to load
+   */
+  protected void loadRecordBatch(ArrowRecordBatch batch) {
+    try {
+      loader.load(batch);
+    } finally {
+      batch.close();
+    }
+  }
+
+  /**
+   * Load an ArrowDictionaryBatch to the readers dictionary vectors.
+   *
+   * @param dictionaryBatch
+   */
+  protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
     long id = dictionaryBatch.getDictionaryId();
     Dictionary dictionary = dictionaries.get(id);
     if (dictionary == null) {
@@ -227,6 +239,10 @@ public abstract class ArrowReader<T extends ReadChannel> implements DictionaryPr
     FieldVector vector = dictionary.getVector();
     VectorSchemaRoot root = new VectorSchemaRoot(ImmutableList.of(vector.getField()), ImmutableList.of(vector), 0);
     VectorLoader loader = new VectorLoader(root);
-    loader.load(dictionaryBatch.getDictionary());
+    try {
+      loader.load(dictionaryBatch.getDictionary());
+    } finally {
+      dictionaryBatch.close();
+    }
   }
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
new file mode 100644
index 0000000..d1e4802
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.flatbuf.MessageHeader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageChannelReader;
+import org.apache.arrow.vector.ipc.message.MessageReader;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/**
+ * This classes reads from an input stream and produces ArrowRecordBatches.
+ */
+public class ArrowStreamReader extends ArrowReader {
+
+  private MessageReader messageReader;
+
+  /**
+   * Constructs a streaming reader using the MessageReader interface. Non-blocking.
+   *
+   * @param messageReader interface to get read messages
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(MessageReader messageReader, BufferAllocator allocator) {
+    super(allocator);
+    this.messageReader = messageReader;
+  }
+
+  /**
+   * Constructs a streaming reader from a ReadableByteChannel input. Non-blocking.
+   *
+   * @param in ReadableByteChannel to read messages from
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
+    this(new MessageChannelReader(new ReadChannel(in)), allocator);
+  }
+
+  /**
+   * Constructs a streaming reader from an InputStream. Non-blocking.
+   *
+   * @param in InputStream to read messages from
+   * @param allocator to allocate new buffers
+   */
+  public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
+    this(Channels.newChannel(in), allocator);
+  }
+
+  /**
+   * Get the number of bytes read from the stream since constructing the reader.
+   *
+   * @return number of bytes
+   */
+  @Override
+  public long bytesRead() {
+    return messageReader.bytesRead();
+  }
+
+  /**
+   * Closes the underlying read source.
+   *
+   * @throws IOException
+   */
+  @Override
+  protected void closeReadSource() throws IOException {
+    messageReader.close();
+  }
+
+  /**
+   * Load the next ArrowRecordBatch to the vector schema root if available.
+   *
+   * @return true if a batch was read, false on EOS
+   * @throws IOException
+   */
+  public boolean loadNextBatch() throws IOException {
+    prepareLoadNextBatch();
+
+    Message message = messageReader.readNextMessage();
+
+    // Reached EOS
+    if (message == null) {
+      return false;
+    }
+
+    if (message.headerType() != MessageHeader.RecordBatch) {
+      throw new IOException("Expected RecordBatch but header was " + message.headerType());
+    }
+
+    ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(messageReader, message, allocator);
+    loadRecordBatch(batch);
+    return true;
+  }
+
+  /**
+   * Reads the schema message from the beginning of the stream.
+   *
+   * @return the deserialized arrow schema
+   */
+  @Override
+  protected Schema readSchema() throws IOException {
+    return MessageSerializer.deserializeSchema(messageReader);
+  }
+
+  /**
+   * Read a dictionary batch message, will be invoked after the schema and before normal record
+   * batches are read.
+   *
+   * @return the deserialized dictionary batch
+   * @throws IOException
+   */
+  @Override
+  protected ArrowDictionaryBatch readDictionary() throws IOException {
+    Message message = messageReader.readNextMessage();
+
+    if (message.headerType() != MessageHeader.DictionaryBatch) {
+      throw new IOException("Expected DictionaryBatch but header was " + message.headerType());
+    }
+
+    return MessageSerializer.deserializeDictionaryBatch(messageReader, message, allocator);
+  }
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
similarity index 84%
rename from java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index b854cd2..d731d05 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -16,16 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc;
 
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ArrowWriter;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.ArrowWriter;
+import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import java.io.IOException;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
similarity index 95%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 7dc10b5..4b483d0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.WritableByteChannel;
@@ -30,9 +30,10 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.DictionaryUtility;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
similarity index 96%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
index 607207f..ad9d877 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/InvalidArrowFileException.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/InvalidArrowFileException.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 public class InvalidArrowFileException extends RuntimeException {
   private static final long serialVersionUID = 1L;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
similarity index 98%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
index 8017b38..cb11a25 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileReader.java
@@ -16,19 +16,18 @@
  * limitations under the License.
  ******************************************************************************/
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
 import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
 import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
 import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
 import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.arrow.vector.schema.ArrowVectorType.*;
+import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*;
 
 import java.io.File;
 import java.io.IOException;
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -42,9 +41,8 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.file.InvalidArrowFileException;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
similarity index 98%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
index 0c8507b..22423b8 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/JsonFileWriter.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  ******************************************************************************/
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
-import static org.apache.arrow.vector.schema.ArrowVectorType.*;
+import static org.apache.arrow.vector.ipc.message.ArrowVectorType.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -33,7 +33,7 @@ import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.vector.*;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowVectorType;
+import org.apache.arrow.vector.ipc.message.ArrowVectorType;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
similarity index 98%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
index b0eb8f3..395fd7d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
similarity index 97%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
index 46bea13..62ba3b7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/SeekableReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/SeekableReadChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.channels.SeekableByteChannel;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
similarity index 97%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
index 89c9d1f..da500aa 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -25,7 +25,7 @@ import java.nio.channels.WritableByteChannel;
 import com.google.flatbuffers.FlatBufferBuilder;
 
 import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.vector.schema.FBSerializable;
+import org.apache.arrow.vector.ipc.message.FBSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
similarity index 96%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
index e1b4d6a..8731f77 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowBlock.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBlock.java
@@ -16,10 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.Block;
-import org.apache.arrow.vector.schema.FBSerializable;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
similarity index 97%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
index 4e0187e..6b0eeaa 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowBuffer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowBuffer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.Buffer;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
similarity index 97%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
index 635fa3f..cd23cb9 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowDictionaryBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 import org.apache.arrow.flatbuf.DictionaryBatch;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
similarity index 97%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
index 3ed384e..ca0087f 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowFieldNode.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFieldNode.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.FieldNode;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
similarity index 96%
rename from java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
index 1e95321..f7794f7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowFooter.java
@@ -16,16 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc.message;
 
-import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+import static org.apache.arrow.vector.ipc.message.FBSerializables.writeAllStructsToVector;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.arrow.flatbuf.Block;
 import org.apache.arrow.flatbuf.Footer;
-import org.apache.arrow.vector.schema.FBSerializable;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.flatbuffers.FlatBufferBuilder;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
similarity index 96%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
index f59b4b6..92fb58e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowMessage.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 public interface ArrowMessage extends FBSerializable, AutoCloseable {
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
similarity index 94%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index bf0967a..6c6481e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -16,9 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
-
-import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+package org.apache.arrow.vector.ipc.message;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -113,9 +111,9 @@ public class ArrowRecordBatch implements ArrowMessage {
   @Override
   public int writeTo(FlatBufferBuilder builder) {
     RecordBatch.startNodesVector(builder, nodes.size());
-    int nodesOffset = writeAllStructsToVector(builder, nodes);
+    int nodesOffset = FBSerializables.writeAllStructsToVector(builder, nodes);
     RecordBatch.startBuffersVector(builder, buffers.size());
-    int buffersOffset = writeAllStructsToVector(builder, buffersLayout);
+    int buffersOffset = FBSerializables.writeAllStructsToVector(builder, buffersLayout);
     RecordBatch.startRecordBatch(builder);
     RecordBatch.addLength(builder, length);
     RecordBatch.addNodes(builder, nodesOffset);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
similarity index 98%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
index 9d2fdfa..3342652 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowVectorType.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowVectorType.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import java.util.Map;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
similarity index 95%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
index 91d60ea..31f55bd 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializable.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializable.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
similarity index 96%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
index ae5aa55..6717ed7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/FBSerializables.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/FBSerializables.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import java.util.ArrayList;
 import java.util.Collections;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
new file mode 100644
index 0000000..5bc3e1f
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ReadChannel;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Reads a sequence of messages using a ReadChannel.
+ */
+public class MessageChannelReader implements MessageReader {
+
+  private ReadChannel in;
+
+  /**
+   * Construct from an existing ReadChannel.
+   *
+   * @param in Channel to read messages from
+   */
+  public MessageChannelReader(ReadChannel in) {
+    this.in = in;
+  }
+
+  /**
+   * Read the next message from the ReadChannel.
+   *
+   * @return A Message or null if ReadChannel has no more messages, indicated by message length of 0
+   * @throws IOException
+   */
+  @Override
+  public Message readNextMessage() throws IOException {
+    // Read the message size. There is an i32 little endian prefix.
+    ByteBuffer buffer = ByteBuffer.allocate(4);
+    if (in.readFully(buffer) != 4) {
+      return null;
+    }
+    int messageLength = MessageSerializer.bytesToInt(buffer.array());
+    if (messageLength == 0) {
+      return null;
+    }
+
+    buffer = ByteBuffer.allocate(messageLength);
+    if (in.readFully(buffer) != messageLength) {
+      throw new IOException(
+          "Unexpected end of stream trying to read message.");
+    }
+    buffer.rewind();
+
+    return Message.getRootAsMessage(buffer);
+  }
+
+  /**
+   * Read a message body from the ReadChannel.
+   *
+   * @param message Read message that is followed by a body of data
+   * @param allocator BufferAllocator to allocate memory for body data
+   * @return ArrowBuf containing the message body data
+   * @throws IOException
+   */
+  @Override
+  public ArrowBuf readMessageBody(Message message, BufferAllocator allocator) throws IOException {
+
+    int bodyLength = (int) message.bodyLength();
+
+    // Now read the record batch body
+    ArrowBuf buffer = allocator.buffer(bodyLength);
+    if (in.readFully(buffer, bodyLength) != bodyLength) {
+      throw new IOException("Unexpected end of input trying to read batch.");
+    }
+
+    return buffer;
+  }
+
+  /**
+   * Get the number of bytes read from the ReadChannel.
+   *
+   * @return number of bytes
+   */
+  @Override
+  public long bytesRead() {
+    return in.bytesRead();
+  }
+
+  /**
+   * Close the ReadChannel.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    in.close();
+  }
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java
new file mode 100644
index 0000000..b277c58
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.memory.BufferAllocator;
+
+import java.io.IOException;
+
+/**
+ * Interface for reading a sequence of messages.
+ */
+public interface MessageReader {
+
+  /**
+   * Read the next message in the sequence.
+   *
+   * @return The read message or null if reached the end of the message sequence
+   * @throws IOException
+   */
+  Message readNextMessage() throws IOException;
+
+  /**
+   * When a message is followed by a body of data, read that data into an ArrowBuf. This should
+   * only be called when a Message has a body length > 0.
+   *
+   * @param message Read message that is followed by a body of data
+   * @param allocator BufferAllocator to allocate memory for body data
+   * @return An ArrowBuf containing the body of the message that was read
+   * @throws IOException
+   */
+  ArrowBuf readMessageBody(Message message, BufferAllocator allocator) throws IOException;
+
+  /**
+   * Return the current number of bytes that have been read.
+   *
+   * @return number of bytes read
+   */
+  long bytesRead();
+
+  /**
+   * Close any resource opened by the message reader, not including message body allocations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
similarity index 86%
rename from java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index c397cec..e2f8f7d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc.message;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -31,14 +31,8 @@ import org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.arrow.flatbuf.MetadataVersion;
 import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.schema.ArrowBuffer;
-import org.apache.arrow.vector.schema.ArrowDictionaryBatch;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.flatbuffers.FlatBufferBuilder;
@@ -102,12 +96,12 @@ public class MessageSerializer {
   /**
    * Deserializes a schema object. Format is from serialize().
    *
-   * @param in the channel to deserialize from
+   * @param reader the reader interface to deserialize from
    * @return the deserialized object
    * @throws IOException if something went wrong
    */
-  public static Schema deserializeSchema(ReadChannel in) throws IOException {
-    Message message = deserializeMessage(in);
+  public static Schema deserializeSchema(MessageReader reader) throws IOException {
+    Message message = reader.readNextMessage();
     if (message == null) {
       throw new IOException("Unexpected end of input. Missing schema.");
     }
@@ -119,6 +113,16 @@ public class MessageSerializer {
         message.header(new org.apache.arrow.flatbuf.Schema()));
   }
 
+  /**
+   * Deserializes a schema object. Format is from serialize().
+   *
+   * @param in the channel to deserialize from
+   * @return the deserialized object
+   * @throws IOException if something went wrong
+   */
+  public static Schema deserializeSchema(ReadChannel in) throws IOException {
+    return deserializeSchema(new MessageChannelReader(in));
+  }
 
   /**
    * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
@@ -184,25 +188,20 @@ public class MessageSerializer {
   }
 
   /**
-   * Deserializes a RecordBatch
+   * Deserializes a RecordBatch.
    *
-   * @param in      the channel to deserialize from
+   * @param reader  the reader interface to deserialize from
    * @param message the object to derialize to
    * @param alloc   to allocate buffers
    * @return the deserialized object
    * @throws IOException if something went wrong
    */
-  public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, Message message, BufferAllocator alloc)
+  public static ArrowRecordBatch deserializeRecordBatch(MessageReader reader, Message message, BufferAllocator alloc)
       throws IOException {
     RecordBatch recordBatchFB = (RecordBatch) message.header(new RecordBatch());
 
-    int bodyLength = (int) message.bodyLength();
-
     // Now read the record batch body
-    ArrowBuf buffer = alloc.buffer(bodyLength);
-    if (in.readFully(buffer, bodyLength) != bodyLength) {
-      throw new IOException("Unexpected end of input trying to read batch.");
-    }
+    ArrowBuf buffer = reader.readMessageBody(message, alloc);
     return deserializeRecordBatch(recordBatchFB, buffer);
   }
 
@@ -243,7 +242,14 @@ public class MessageSerializer {
     return deserializeRecordBatch(recordBatchFB, body);
   }
 
-  // Deserializes a record batch given the Flatbuffer metadata and in-memory body
+  /**
+   * Deserializes a record batch given the Flatbuffer metadata and in-memory body.
+   *
+   * @param recordBatchFB Deserialized FlatBuffer record batch
+   * @param body Read body of the record batch
+   * @return ArrowRecordBatch from metadata and in-memory body
+   * @throws IOException
+   */
   public static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB,
                                                         ArrowBuf body) throws IOException {
     // Now read the body
@@ -314,26 +320,21 @@ public class MessageSerializer {
   }
 
   /**
-   * Deserializes a DictionaryBatch
+   * Deserializes a DictionaryBatch.
    *
-   * @param in      where to read from
+   * @param reader  where to read from
    * @param message the message message metadata to deserialize
    * @param alloc   the allocator for new buffers
    * @return the corresponding dictionary batch
    * @throws IOException if something went wrong
    */
-  public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in,
+  public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageReader reader,
                                                                 Message message,
                                                                 BufferAllocator alloc) throws IOException {
     DictionaryBatch dictionaryBatchFB = (DictionaryBatch) message.header(new DictionaryBatch());
 
-    int bodyLength = (int) message.bodyLength();
-
     // Now read the record batch body
-    ArrowBuf body = alloc.buffer(bodyLength);
-    if (in.readFully(body, bodyLength) != bodyLength) {
-      throw new IOException("Unexpected end of input trying to read batch.");
-    }
+    ArrowBuf body = reader.readMessageBody(message, alloc);
     ArrowRecordBatch recordBatch = deserializeRecordBatch(dictionaryBatchFB.data(), body);
     return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
   }
@@ -377,8 +378,16 @@ public class MessageSerializer {
     return new ArrowDictionaryBatch(dictionaryBatchFB.id(), recordBatch);
   }
 
-  public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException {
-    Message message = deserializeMessage(in);
+  /**
+   * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch.
+   *
+   * @param reader Interface to read messages from
+   * @param alloc Allocator for message data
+   * @return The deserialized record batch
+   * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch
+   */
+  public static ArrowMessage deserializeMessageBatch(MessageReader reader, BufferAllocator alloc) throws IOException {
+    Message message = reader.readNextMessage();
     if (message == null) {
       return null;
     } else if (message.bodyLength() > Integer.MAX_VALUE) {
@@ -391,15 +400,27 @@ public class MessageSerializer {
 
     switch (message.headerType()) {
       case MessageHeader.RecordBatch:
-        return deserializeRecordBatch(in, message, alloc);
+        return deserializeRecordBatch(reader, message, alloc);
       case MessageHeader.DictionaryBatch:
-        return deserializeDictionaryBatch(in, message, alloc);
+        return deserializeDictionaryBatch(reader, message, alloc);
       default:
         throw new IOException("Unexpected message header type " + message.headerType());
     }
   }
 
   /**
+   * Deserialize a message that is either an ArrowDictionaryBatch or ArrowRecordBatch.
+   *
+   * @param in ReadChannel to read messages from
+   * @param alloc Allocator for message data
+   * @return The deserialized record batch
+   * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch
+   */
+  public static ArrowMessage deserializeMessageBatch(ReadChannel in, BufferAllocator alloc) throws IOException {
+    return deserializeMessageBatch(new MessageChannelReader(in), alloc);
+  }
+
+  /**
    * Serializes a message header.
    *
    * @param builder      to write the flatbuf to
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
similarity index 80%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
index 29407bf..06fe948 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/TypeLayout.java
@@ -16,15 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
+package org.apache.arrow.vector.ipc.message;
 
 import static java.util.Arrays.asList;
-import static org.apache.arrow.vector.schema.VectorLayout.booleanVector;
-import static org.apache.arrow.vector.schema.VectorLayout.byteVector;
-import static org.apache.arrow.vector.schema.VectorLayout.dataVector;
-import static org.apache.arrow.vector.schema.VectorLayout.offsetVector;
-import static org.apache.arrow.vector.schema.VectorLayout.typeVector;
-import static org.apache.arrow.vector.schema.VectorLayout.validityVector;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -64,7 +58,7 @@ public class TypeLayout {
 
       @Override
       public TypeLayout visit(Int type) {
-        return newFixedWidthTypeLayout(dataVector(type.getBitWidth()));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth()));
       }
 
       @Override
@@ -74,14 +68,14 @@ public class TypeLayout {
           case Dense:
             vectors = asList(
                 // TODO: validate this
-                validityVector(),
-                typeVector(),
-                offsetVector() // offset to find the vector
+                VectorLayout.validityVector(),
+                VectorLayout.typeVector(),
+                VectorLayout.offsetVector() // offset to find the vector
             );
             break;
           case Sparse:
             vectors = asList(
-                typeVector() // type of the value at the index or 0 if null
+                VectorLayout.typeVector() // type of the value at the index or 0 if null
             );
             break;
           default:
@@ -93,21 +87,21 @@ public class TypeLayout {
       @Override
       public TypeLayout visit(Struct type) {
         List<VectorLayout> vectors = asList(
-            validityVector()
+            VectorLayout.validityVector()
         );
         return new TypeLayout(vectors);
       }
 
       @Override
       public TypeLayout visit(Timestamp type) {
-        return newFixedWidthTypeLayout(dataVector(64));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
       }
 
       @Override
       public TypeLayout visit(org.apache.arrow.vector.types.pojo.ArrowType.List type) {
         List<VectorLayout> vectors = asList(
-            validityVector(),
-            offsetVector()
+            VectorLayout.validityVector(),
+            VectorLayout.offsetVector()
         );
         return new TypeLayout(vectors);
       }
@@ -115,7 +109,7 @@ public class TypeLayout {
       @Override
       public TypeLayout visit(FixedSizeList type) {
         List<VectorLayout> vectors = asList(
-            validityVector()
+            VectorLayout.validityVector()
         );
         return new TypeLayout(vectors);
       }
@@ -136,18 +130,18 @@ public class TypeLayout {
           default:
             throw new UnsupportedOperationException("Unsupported Precision: " + type.getPrecision());
         }
-        return newFixedWidthTypeLayout(dataVector(bitWidth));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(bitWidth));
       }
 
       @Override
       public TypeLayout visit(Decimal type) {
         // TODO: check size
-        return newFixedWidthTypeLayout(dataVector(64)); // actually depends on the type fields
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64)); // actually depends on the type fields
       }
 
       @Override
       public TypeLayout visit(Bool type) {
-        return newFixedWidthTypeLayout(booleanVector());
+        return newFixedWidthTypeLayout(VectorLayout.booleanVector());
       }
 
       @Override
@@ -161,7 +155,7 @@ public class TypeLayout {
       }
 
       private TypeLayout newVariableWidthTypeLayout() {
-        return newPrimitiveTypeLayout(validityVector(), offsetVector(), byteVector());
+        return newPrimitiveTypeLayout(VectorLayout.validityVector(), VectorLayout.offsetVector(), VectorLayout.byteVector());
       }
 
       private TypeLayout newPrimitiveTypeLayout(VectorLayout... vectors) {
@@ -169,7 +163,7 @@ public class TypeLayout {
       }
 
       public TypeLayout newFixedWidthTypeLayout(VectorLayout dataVector) {
-        return newPrimitiveTypeLayout(validityVector(), dataVector);
+        return newPrimitiveTypeLayout(VectorLayout.validityVector(), dataVector);
       }
 
       @Override
@@ -179,21 +173,21 @@ public class TypeLayout {
 
       @Override
       public TypeLayout visit(Date type) {
-        return newFixedWidthTypeLayout(dataVector(64));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
       }
 
       @Override
       public TypeLayout visit(Time type) {
-        return newFixedWidthTypeLayout(dataVector(type.getBitWidth()));
+        return newFixedWidthTypeLayout(VectorLayout.dataVector(type.getBitWidth()));
       }
 
       @Override
       public TypeLayout visit(Interval type) { // TODO: check size
         switch (type.getUnit()) {
           case DAY_TIME:
-            return newFixedWidthTypeLayout(dataVector(64));
+            return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
           case YEAR_MONTH:
-            return newFixedWidthTypeLayout(dataVector(64));
+            return newFixedWidthTypeLayout(VectorLayout.dataVector(64));
           default:
             throw new UnsupportedOperationException("Unknown unit " + type.getUnit());
         }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
similarity index 89%
rename from java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
rename to java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
index 0871baf..e4f2f98 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/VectorLayout.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/VectorLayout.java
@@ -16,12 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.schema;
-
-import static org.apache.arrow.vector.schema.ArrowVectorType.DATA;
-import static org.apache.arrow.vector.schema.ArrowVectorType.OFFSET;
-import static org.apache.arrow.vector.schema.ArrowVectorType.TYPE;
-import static org.apache.arrow.vector.schema.ArrowVectorType.VALIDITY;
+package org.apache.arrow.vector.ipc.message;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -30,14 +25,14 @@ import com.google.flatbuffers.FlatBufferBuilder;
 
 public class VectorLayout implements FBSerializable {
 
-  private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(VALIDITY, 1);
-  private static final VectorLayout OFFSET_VECTOR = new VectorLayout(OFFSET, 32);
-  private static final VectorLayout TYPE_VECTOR = new VectorLayout(TYPE, 32);
-  private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(DATA, 1);
-  private static final VectorLayout VALUES_64 = new VectorLayout(DATA, 64);
-  private static final VectorLayout VALUES_32 = new VectorLayout(DATA, 32);
-  private static final VectorLayout VALUES_16 = new VectorLayout(DATA, 16);
-  private static final VectorLayout VALUES_8 = new VectorLayout(DATA, 8);
+  private static final VectorLayout VALIDITY_VECTOR = new VectorLayout(ArrowVectorType.VALIDITY, 1);
+  private static final VectorLayout OFFSET_VECTOR = new VectorLayout(ArrowVectorType.OFFSET, 32);
+  private static final VectorLayout TYPE_VECTOR = new VectorLayout(ArrowVectorType.TYPE, 32);
+  private static final VectorLayout BOOLEAN_VECTOR = new VectorLayout(ArrowVectorType.DATA, 1);
+  private static final VectorLayout VALUES_64 = new VectorLayout(ArrowVectorType.DATA, 64);
+  private static final VectorLayout VALUES_32 = new VectorLayout(ArrowVectorType.DATA, 32);
+  private static final VectorLayout VALUES_16 = new VectorLayout(ArrowVectorType.DATA, 16);
+  private static final VectorLayout VALUES_8 = new VectorLayout(ArrowVectorType.DATA, 8);
 
   public static VectorLayout typeVector() {
     return TYPE_VECTOR;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
deleted file mode 100644
index 5b63000..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.vector.stream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.file.ArrowReader;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.types.pojo.Schema;
-
-/**
- * This classes reads from an input stream and produces ArrowRecordBatches.
- */
-public class ArrowStreamReader extends ArrowReader<ReadChannel> {
-
-  /**
-   * Constructs a streaming read, reading bytes from 'in'. Non-blocking.
-   *
-   * @param in        the stream to read from
-   * @param allocator to allocate new buffers
-   */
-  public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
-    super(new ReadChannel(in), allocator);
-  }
-
-  public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
-    this(Channels.newChannel(in), allocator);
-  }
-
-  /**
-   * Reads the schema message from the beginning of the stream.
-   *
-   * @param in to allocate new buffers
-   * @return the deserialized arrow schema
-   */
-  @Override
-  protected Schema readSchema(ReadChannel in) throws IOException {
-    return MessageSerializer.deserializeSchema(in);
-  }
-
-  @Override
-  protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
-    return MessageSerializer.deserializeMessageBatch(in, allocator);
-  }
-}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
index eba149b..5746128 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Field.java
@@ -43,8 +43,8 @@ import org.apache.arrow.flatbuf.KeyValue;
 import org.apache.arrow.flatbuf.Type;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.schema.TypeLayout;
-import org.apache.arrow.vector.schema.VectorLayout;
+import org.apache.arrow.vector.ipc.message.VectorLayout;
+import org.apache.arrow.vector.ipc.message.TypeLayout;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
 
 public class Field {
@@ -117,9 +117,9 @@ public class Field {
       }
       dictionary = new DictionaryEncoding(dictionaryFB.id(), dictionaryFB.isOrdered(), indexType);
     }
-    ImmutableList.Builder<org.apache.arrow.vector.schema.VectorLayout> layout = ImmutableList.builder();
+    ImmutableList.Builder<VectorLayout> layout = ImmutableList.builder();
     for (int i = 0; i < field.layoutLength(); ++i) {
-      layout.add(new org.apache.arrow.vector.schema.VectorLayout(field.layout(i)));
+      layout.add(new VectorLayout(field.layout(i)));
     }
     ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
     for (int i = 0; i < field.childrenLength(); i++) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
index c7ee202..f51a874 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestValueVector.java
@@ -17,7 +17,6 @@
  */
 
 package org.apache.arrow.vector;
-import org.apache.arrow.vector.holders.VarCharHolder;
 import org.apache.arrow.vector.util.OversizedAllocationException;
 
 import static org.apache.arrow.vector.TestUtils.newNullableVarBinaryVector;
@@ -38,15 +37,14 @@ import java.util.ArrayList;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.TypeLayout;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.TypeLayout;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.util.TransferPair;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 3853eec..e61dbec 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -39,8 +39,8 @@ import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
 import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.arrow.vector.complex.writer.BigIntWriter;
 import org.apache.arrow.vector.complex.writer.IntWriter;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
similarity index 99%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
index 874ba99..233b682 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/BaseFileTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
similarity index 94%
rename from java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index f968768..239d303 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.stream;
+package org.apache.arrow.vector.ipc;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertArrayEquals;
@@ -33,12 +33,11 @@ import java.util.List;
 import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.file.ArrowBlock;
-import org.apache.arrow.vector.file.ReadChannel;
-import org.apache.arrow.vector.file.WriteChannel;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowMessage;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
 import org.apache.arrow.vector.types.pojo.Field;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
similarity index 94%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
index 8559969..4387db0 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -40,16 +40,14 @@ import org.apache.arrow.vector.NullableFloat4Vector;
 import org.apache.arrow.vector.NullableIntVector;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.complex.FixedSizeListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.NullableMapVector;
 import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.schema.ArrowBuffer;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowBuffer;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -108,52 +106,41 @@ public class TestArrowFile extends BaseFileTest {
     // read
     try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
          FileInputStream fileInputStream = new FileInputStream(file);
-         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator) {
-           @Override
-           protected ArrowMessage readMessage(SeekableReadChannel in, BufferAllocator allocator) throws IOException {
-             ArrowMessage message = super.readMessage(in, allocator);
-             if (message != null) {
-               ArrowRecordBatch batch = (ArrowRecordBatch) message;
-               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
-               for (ArrowBuffer arrowBuffer : buffersLayout) {
-                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-               }
-             }
-             return message;
-           }
-         }) {
-      Schema schema = arrowReader.getVectorSchemaRoot().getSchema();
-      LOGGER.debug("reading schema: " + schema);
+         ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
+
       VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      VectorUnloader unloader = new VectorUnloader(root);
+      Schema schema = root.getSchema();
+      LOGGER.debug("reading schema: " + schema);
       for (ArrowBlock rbBlock : arrowReader.getRecordBlocks()) {
         arrowReader.loadRecordBatch(rbBlock);
         Assert.assertEquals(count, root.getRowCount());
+        ArrowRecordBatch batch = unloader.getRecordBatch();
+        List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+        for (ArrowBuffer arrowBuffer : buffersLayout) {
+          Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+        }
         validateContent(count, root);
+        batch.close();
       }
     }
 
     // Read from stream.
     try (BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
          ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
-         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator) {
-           @Override
-           protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
-             ArrowMessage message = super.readMessage(in, allocator);
-             if (message != null) {
-               ArrowRecordBatch batch = (ArrowRecordBatch) message;
-               List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
-               for (ArrowBuffer arrowBuffer : buffersLayout) {
-                 Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
-               }
-             }
-             return message;
-           }
-         }) {
+         ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
 
       VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+      VectorUnloader unloader = new VectorUnloader(root);
       Schema schema = root.getSchema();
       LOGGER.debug("reading schema: " + schema);
       Assert.assertTrue(arrowReader.loadNextBatch());
+      ArrowRecordBatch batch = unloader.getRecordBatch();
+      List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+      for (ArrowBuffer arrowBuffer : buffersLayout) {
+        Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+      }
+      batch.close();
       Assert.assertEquals(count, root.getRowCount());
       validateContent(count, root);
     }
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
similarity index 93%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
index 4612465..235e8c1 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowFooter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
@@ -27,6 +27,8 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.arrow.flatbuf.Footer;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFooter;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
similarity index 85%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 3ce01a2..49e194b 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static java.nio.channels.Channels.newChannel;
 import static java.util.Arrays.asList;
@@ -37,9 +37,15 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.TestUtils;
+import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.SeekableReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowBlock;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
@@ -87,7 +93,10 @@ public class TestArrowReaderWriter {
          ArrowFileWriter writer = new ArrowFileWriter(root, null, newChannel(out))) {
       ArrowBuf validityb = buf(validity);
       ArrowBuf valuesb = buf(values);
-      writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
+      ArrowRecordBatch batch = new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
+      VectorLoader loader = new VectorLoader(root);
+      loader.load(batch);
+      writer.writeBatch();
     }
 
     byte[] byteArray = out.toByteArray();
@@ -100,7 +109,9 @@ public class TestArrowReaderWriter {
       // TODO: dictionaries
       List<ArrowBlock> recordBatches = reader.getRecordBlocks();
       assertEquals(1, recordBatches.size());
-      ArrowRecordBatch recordBatch = (ArrowRecordBatch) reader.readMessage(channel, allocator);
+      reader.loadNextBatch();
+      VectorUnloader unloader = new VectorUnloader(reader.getVectorSchemaRoot());
+      ArrowRecordBatch recordBatch = unloader.getRecordBatch();
       List<ArrowFieldNode> nodes = recordBatch.getNodes();
       assertEquals(1, nodes.size());
       ArrowFieldNode node = nodes.get(0);
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
similarity index 88%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
index c7e3419..7a8586a 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
@@ -16,9 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
-import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -27,16 +26,12 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.BaseFileTest;
+import org.apache.arrow.vector.ipc.MessageSerializerTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
similarity index 78%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
index f393733..65e6cea 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file;
+package org.apache.arrow.vector.ipc;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -30,10 +30,9 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.NullableTinyIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.schema.ArrowMessage;
-import org.apache.arrow.vector.stream.ArrowStreamReader;
-import org.apache.arrow.vector.stream.ArrowStreamWriter;
-import org.apache.arrow.vector.stream.MessageSerializerTest;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.MessageSerializerTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.Assert;
 import org.junit.Test;
@@ -95,37 +94,28 @@ public class TestArrowStreamPipe {
     public ReaderThread(ReadableByteChannel sourceChannel)
         throws IOException {
       reader = new ArrowStreamReader(sourceChannel, alloc) {
-        @Override
-        protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
-          // Read all the batches. Each batch contains an incrementing id and then some
-          // constant data. Verify both.
-          ArrowMessage message = super.readMessage(in, allocator);
-          if (message == null) {
-            done = true;
-          } else {
-            batchesRead++;
-          }
-          return message;
-        }
 
         @Override
         public boolean loadNextBatch() throws IOException {
-          if (!super.loadNextBatch()) {
+          if (super.loadNextBatch()) {
+            batchesRead++;
+          } else {
+            done = true;
             return false;
           }
-          if (!done) {
-            VectorSchemaRoot root = getVectorSchemaRoot();
-            Assert.assertEquals(16, root.getRowCount());
-            NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
-            Assert.assertEquals((byte) (batchesRead - 1), vector.get(0));
-            for (int i = 1; i < 16; i++) {
-              if (i < 8) {
-                Assert.assertEquals((byte) (i + 1), vector.get(i));
-              } else {
-                Assert.assertTrue(vector.isNull(i));
-              }
+
+          VectorSchemaRoot root = getVectorSchemaRoot();
+          Assert.assertEquals(16, root.getRowCount());
+          NullableTinyIntVector vector = (NullableTinyIntVector) root.getFieldVectors().get(0);
+          Assert.assertEquals((byte) (batchesRead - 1), vector.get(0));
+          for (int i = 1; i < 16; i++) {
+            if (i < 8) {
+              Assert.assertEquals((byte) (i + 1), vector.get(i));
+            } else {
+              Assert.assertTrue(vector.isNull(i));
             }
           }
+
           return true;
         }
       };
@@ -139,7 +129,7 @@ public class TestArrowStreamPipe {
             reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
             reader.getVectorSchemaRoot().getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0);
         while (!done) {
-          assertTrue(reader.loadNextBatch());
+          assertTrue(reader.loadNextBatch() != done);
         }
         reader.close();
       } catch (IOException e) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
similarity index 99%
rename from java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
rename to java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
index 5c4c48c..c3e0b79 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestJSONFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.arrow.vector.file.json;
+package org.apache.arrow.vector.ipc;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,7 +28,6 @@ import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.NullableMapVector;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.file.BaseFileTest;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.Validator;
 import org.junit.Assert;

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].