You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by cu...@apache.org on 2019/08/08 17:36:03 UTC

[arrow] branch master updated: ARROW-5579: [Java] Shade flatbuffers

This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d549b7c  ARROW-5579: [Java] Shade flatbuffers
d549b7c is described below

commit d549b7c6223a16b462c2ec1cbcc1698827b0c87f
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Thu Aug 8 10:35:38 2019 -0700

    ARROW-5579: [Java] Shade flatbuffers
    
    - Merge the format package into the vector package
    - shade flatbuffers in vector
    - create missing wrappers to avoid direct flatbuffer dependencies in flight
    
    [] Possibly add unit tests to cover new functions/classes.
    
    @pravindra @tianchen92 please take a look, I think this is the only reasonable way to shade flatbuffers.  I don't know if there was any particular reason that format and vector where separate but I think  it makes it very difficult to shade if we don't take this approach.
    
    Closes #4701 from emkornfield/shade2 and squashes the following commits:
    
    4269ce8 <Micah Kornfield> add comment
    d196772 <Micah Kornfield> rename getType
    45fad2a <Micah Kornfield> address CR feedback
    f5a3a70 <Micah Kornfield> centralize serialization code in the serializer
    9b54c6e <Micah Kornfield> ArrowMessageHeader->ArrowMessageMetadata
    8daeca4 <Micah Kornfield> remove extra whitespace
    90eebb3 <Micah Kornfield> address feedback part 1
    38fabe8 <Micah Kornfield> Add shading, cleanup depedencies and CI for shaded artifacts
    11f9303 <Micah Kornfield> Refactor java files so all flatbuffers are wrapped in vector
    
    Authored-by: Micah Kornfield <em...@gmail.com>
    Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
 ci/docker_build_java.sh                            |  6 +-
 ci/docker_java_test_all.sh                         |  1 +
 java/README.md                                     |  9 +++
 java/adapter/jdbc/pom.xml                          |  1 +
 java/adapter/orc/pom.xml                           |  9 +--
 .../apache/arrow/adapter/orc/OrcStripeReader.java  |  5 --
 java/algorithm/pom.xml                             |  1 +
 java/flight/pom.xml                                | 10 +--
 .../java/org/apache/arrow/flight/ArrowMessage.java | 62 ++++++----------
 .../org/apache/arrow/flight/perf/TestPerf.java     |  7 +-
 java/gandiva/pom.xml                               |  1 +
 java/performance/pom.xml                           |  1 +
 java/pom.xml                                       | 17 ++++-
 java/tools/pom.xml                                 |  1 +
 java/vector/pom.xml                                | 32 ++++++++
 .../vector/ipc/message/ArrowDictionaryBatch.java   |  5 ++
 .../arrow/vector/ipc/message/ArrowMessage.java     |  3 +
 .../arrow/vector/ipc/message/ArrowRecordBatch.java |  4 +
 .../vector/ipc/message/MessageMetadataResult.java  | 22 +++++-
 .../vector/ipc/message/MessageSerializer.java      | 86 +++++++++++++++++-----
 .../ipc/message/TestMessageMetadataResult.java}    | 25 +++----
 21 files changed, 205 insertions(+), 103 deletions(-)

diff --git a/ci/docker_build_java.sh b/ci/docker_build_java.sh
index e6516b7..f1e5276 100755
--- a/ci/docker_build_java.sh
+++ b/ci/docker_build_java.sh
@@ -37,8 +37,12 @@ if [ "$ARROW_JAVA_RUN_TESTS" != "1" ]; then
   JAVA_ARGS=-DskipTests
 fi
 
+if [ "$ARROW_JAVA_SHADE_FLATBUFS" == "1"]; then
+  export SHADE_FLATBUFFERS = " -Pshade-flatbuffers"
+fi
+
 pushd $arrow_src/java
-mvn -B $JAVA_ARGS -Drat.skip=true install
+mvn -B $JAVA_ARGS -Drat.skip=true install $SHADE_FLATBUFFERS
 
 if [ "$ARROW_JAVADOC" == "1" ]; then
   export MAVEN_OPTS="$MAVEN_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=warn"
diff --git a/ci/docker_java_test_all.sh b/ci/docker_java_test_all.sh
index 1466907..ca48894 100755
--- a/ci/docker_java_test_all.sh
+++ b/ci/docker_java_test_all.sh
@@ -28,6 +28,7 @@ export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
 export ARROW_JAVADOC=1
 bash $SOURCE_DIR/docker_build_java.sh
 
+export ARROW_JAVA_SHADE_FLATBUFS=1
 export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
 export ARROW_JAVADOC=0
 bash $SOURCE_DIR/docker_build_java.sh
diff --git a/java/README.md b/java/README.md
index 23575bf..34ff51a 100644
--- a/java/README.md
+++ b/java/README.md
@@ -45,6 +45,15 @@ mvn install -P arrow-jni -am -Darrow.cpp.build.dir=../../release
 The gandiva library is still in Alpha stages, and subject to API changes without
 deprecation warnings.
 
+## Flatbuffers dependency
+
+Arrow uses Google's Flatbuffers to transport metadata.  The java version of the library
+requires the generated flatbuffer classes can only be used with the same version that
+generated them.  Arrow packages a verion of the arrow-vector module that shades flatbuffers
+and arrow-format into a single JAR.  Using the classifier "shade-format-flatbuffers" in your
+pom.xml will make use of this JAR, you can then exclude/resolve the original dependency to
+a version of your choosing.
+
 ## Java Code Style Guide
 
 Arrow Java follows the Google style guide [here][3] with the following
diff --git a/java/adapter/jdbc/pom.xml b/java/adapter/jdbc/pom.xml
index 767f3d1..d2a8387 100644
--- a/java/adapter/jdbc/pom.xml
+++ b/java/adapter/jdbc/pom.xml
@@ -38,6 +38,7 @@
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-vector</artifactId>
             <version>${project.version}</version>
+            <classifier>${arrow.vector.classifier}</classifier>
         </dependency>
 
         <!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
diff --git a/java/adapter/orc/pom.xml b/java/adapter/orc/pom.xml
index 0ee1cdd..10fb4e1 100644
--- a/java/adapter/orc/pom.xml
+++ b/java/adapter/orc/pom.xml
@@ -24,14 +24,9 @@
             <artifactId>arrow-vector</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
+            <classifier>${arrow.vector.classifier}</classifier>
         </dependency>
-        <dependency>
-            <groupId>org.apache.arrow</groupId>
-            <artifactId>arrow-format</artifactId>
-            <version>${project.version}</version>
-            <scope>compile</scope>
-        </dependency>
-        <dependency>
+	<dependency>
             <groupId>org.apache.orc</groupId>
             <artifactId>orc-core</artifactId>
             <version>1.5.5</version>
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
index c69e74a..9798eca 100644
--- a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.stream.Collectors;
 
-import org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.ReadChannel;
@@ -107,10 +106,6 @@ public class OrcStripeReader extends ArrowReader {
         throw new IOException("Unexpected end of input. Missing schema.");
       }
 
-      if (result.getMessage().headerType() != MessageHeader.Schema) {
-        throw new IOException("Expected schema but header was " + result.getMessage().headerType());
-      }
-
       return MessageSerializer.deserializeSchema(result.getMessage());
     }
   }
diff --git a/java/algorithm/pom.xml b/java/algorithm/pom.xml
index 90ea24a..779c2ca 100644
--- a/java/algorithm/pom.xml
+++ b/java/algorithm/pom.xml
@@ -24,6 +24,7 @@
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-vector</artifactId>
       <version>${project.version}</version>
+      <classifier>${arrow.vector.classifier}</classifier>
     </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
diff --git a/java/flight/pom.xml b/java/flight/pom.xml
index 6d8cb5c..4fd3261 100644
--- a/java/flight/pom.xml
+++ b/java/flight/pom.xml
@@ -26,13 +26,10 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.google.flatbuffers</groupId>
-      <artifactId>flatbuffers-java</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.arrow</groupId>
       <artifactId>arrow-vector</artifactId>
       <version>${project.version}</version>
+      <classifier>${arrow.vector.classifier}</classifier>
     </dependency>
     <dependency>
       <groupId>org.apache.arrow</groupId>
@@ -40,11 +37,6 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.arrow</groupId>
-      <artifactId>arrow-format</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
       <version>${dep.grpc.version}</version>
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 6611b3e..f837ec8 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -26,9 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.arrow.flatbuf.Message;
-import org.apache.arrow.flatbuf.MessageHeader;
-import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.flight.grpc.AddWritableBuffer;
 import org.apache.arrow.flight.grpc.GetReadableBuffer;
 import org.apache.arrow.flight.impl.Flight.FlightData;
@@ -38,13 +35,13 @@ import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageMetadataResult;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Schema;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.io.ByteStreams;
-import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
@@ -54,7 +51,6 @@ import io.grpc.Drainable;
 import io.grpc.MethodDescriptor;
 import io.grpc.MethodDescriptor.Marshaller;
 import io.grpc.protobuf.ProtoUtils;
-
 import io.netty.buffer.ArrowBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
@@ -120,16 +116,15 @@ class ArrowMessage implements AutoCloseable {
   );
 
   private final FlightDescriptor descriptor;
-  private final Message message;
+  private final MessageMetadataResult message;
   private final ArrowBuf appMetadata;
   private final List<ArrowBuf> bufs;
 
+
   public ArrowMessage(FlightDescriptor descriptor, Schema schema) {
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int schemaOffset = schema.getSchema(builder);
-    ByteBuffer serializedMessage = MessageSerializer.serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0);
-    serializedMessage = serializedMessage.slice();
-    message = Message.getRootAsMessage(serializedMessage);
+    ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(schema);
+    this.message = MessageMetadataResult.create(serializedMessage.slice(),
+        serializedMessage.remaining());
     bufs = ImmutableList.of();
     this.descriptor = descriptor;
     this.appMetadata = null;
@@ -141,25 +136,17 @@ class ArrowMessage implements AutoCloseable {
    * @param appMetadata The app metadata. May be null. Takes ownership of the buffer otherwise.
    */
   public ArrowMessage(ArrowRecordBatch batch, ArrowBuf appMetadata) {
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int batchOffset = batch.writeTo(builder);
-    ByteBuffer serializedMessage = MessageSerializer.serializeMessage(builder, MessageHeader.RecordBatch, batchOffset,
-        batch.computeBodyLength());
-    serializedMessage = serializedMessage.slice();
-    this.message = Message.getRootAsMessage(serializedMessage);
+    ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch);
+    this.message = MessageMetadataResult.create(serializedMessage.slice(), serializedMessage.remaining());
     this.bufs = ImmutableList.copyOf(batch.getBuffers());
     this.descriptor = null;
     this.appMetadata = appMetadata;
   }
 
   public ArrowMessage(ArrowDictionaryBatch batch) {
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int batchOffset = batch.writeTo(builder);
-    ByteBuffer serializedMessage = MessageSerializer
-        .serializeMessage(builder, MessageHeader.DictionaryBatch, batchOffset,
-            batch.computeBodyLength());
+    ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch);
     serializedMessage = serializedMessage.slice();
-    this.message = Message.getRootAsMessage(serializedMessage);
+    this.message = MessageMetadataResult.create(serializedMessage, serializedMessage.remaining());
     // asInputStream will free the buffers implicitly, so increment the reference count
     batch.getDictionary().getBuffers().forEach(buf -> buf.getReferenceManager().retain());
     this.bufs = ImmutableList.copyOf(batch.getDictionary().getBuffers());
@@ -167,13 +154,18 @@ class ArrowMessage implements AutoCloseable {
     this.appMetadata = null;
   }
 
-  private ArrowMessage(FlightDescriptor descriptor, Message message, ArrowBuf appMetadata, ArrowBuf buf) {
+  private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
+                       ArrowBuf buf) {
     this.message = message;
     this.descriptor = descriptor;
     this.appMetadata = appMetadata;
     this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf);
   }
 
+  public MessageMetadataResult asSchemaMessage() {
+    return message;
+  }
+
   public FlightDescriptor getDescriptor() {
     return descriptor;
   }
@@ -182,27 +174,20 @@ class ArrowMessage implements AutoCloseable {
     return HeaderType.getHeader(message.headerType());
   }
 
-  public Message asSchemaMessage() {
-    return message;
-  }
-
   public Schema asSchema() {
     Preconditions.checkArgument(bufs.size() == 0);
     Preconditions.checkArgument(getMessageType() == HeaderType.SCHEMA);
-    org.apache.arrow.flatbuf.Schema schema = new org.apache.arrow.flatbuf.Schema();
-    message.header(schema);
-    return Schema.convertSchema(schema);
+    return MessageSerializer.deserializeSchema(message);
   }
 
   public ArrowRecordBatch asRecordBatch() throws IOException {
     Preconditions.checkArgument(bufs.size() == 1, "A batch can only be consumed if it contains a single ArrowBuf.");
     Preconditions.checkArgument(getMessageType() == HeaderType.RECORD_BATCH);
-    RecordBatch recordBatch = new RecordBatch();
-    message.header(recordBatch);
+
     ArrowBuf underlying = bufs.get(0);
+
     underlying.getReferenceManager().retain();
-    ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(recordBatch, underlying);
-    return batch;
+    return  MessageSerializer.deserializeRecordBatch(message, underlying);
   }
 
   public ArrowDictionaryBatch asDictionaryBatch() throws IOException {
@@ -220,7 +205,7 @@ class ArrowMessage implements AutoCloseable {
 
     try {
       FlightDescriptor descriptor = null;
-      Message header = null;
+      MessageMetadataResult header = null;
       ArrowBuf body = null;
       ArrowBuf appMetadata = null;
       while (stream.available() > 0) {
@@ -238,7 +223,7 @@ class ArrowMessage implements AutoCloseable {
             int size = readRawVarint32(stream);
             byte[] bytes = new byte[size];
             ByteStreams.readFully(stream, bytes);
-            header = Message.getRootAsMessage(ByteBuffer.wrap(bytes));
+            header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
             break;
           }
           case APP_METADATA_TAG: {
@@ -282,7 +267,8 @@ class ArrowMessage implements AutoCloseable {
   private InputStream asInputStream(BufferAllocator allocator) {
     try {
 
-      final ByteString bytes = ByteString.copyFrom(message.getByteBuffer(), message.getByteBuffer().remaining());
+      final ByteString bytes = ByteString.copyFrom(message.getMessageBuffer(),
+          message.bytesAfterMessage());
 
       if (getMessageType() == HeaderType.SCHEMA) {
 
diff --git a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
index c23c793..eed428c 100644
--- a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
+++ b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
@@ -34,6 +34,7 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -46,7 +47,6 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.protobuf.ByteString;
 
 @org.junit.Ignore
@@ -62,13 +62,12 @@ public class TestPerf {
         Field.nullable("d", MinorType.BIGINT.getType())
     ));
 
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    pojoSchema.getSchema(builder);
+    ByteString serializedSchema = ByteString.copyFrom(MessageSerializer.serializeMetadata(pojoSchema));
 
     return FlightDescriptor.command(Perf.newBuilder()
         .setRecordsPerStream(recordCount)
         .setRecordsPerBatch(recordsPerBatch)
-        .setSchema(ByteString.copyFrom(pojoSchema.toByteArray()))
+        .setSchema(serializedSchema)
         .setStreamCount(streamCount)
         .build()
         .toByteArray());
diff --git a/java/gandiva/pom.xml b/java/gandiva/pom.xml
index 02734d6..74feb68 100644
--- a/java/gandiva/pom.xml
+++ b/java/gandiva/pom.xml
@@ -39,6 +39,7 @@
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-vector</artifactId>
             <version>${project.version}</version>
+            <classifier>${arrow.vector.classifier}</classifier>
         </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
diff --git a/java/performance/pom.xml b/java/performance/pom.xml
index 0b42d07..dcc20a6 100644
--- a/java/performance/pom.xml
+++ b/java/performance/pom.xml
@@ -48,6 +48,7 @@
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-vector</artifactId>
             <version>${project.version}</version>
+            <classifier>${arrow.vector.classifier}</classifier>
         </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
diff --git a/java/pom.xml b/java/pom.xml
index 29d602d..617809b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -38,6 +38,7 @@
     <dep.hadoop.version>2.7.1</dep.hadoop.version>
     <dep.fbs.version>1.9.0</dep.fbs.version>
     <dep.flatc.version>1.9.0</dep.flatc.version>
+    <arrow.vector.classifier></arrow.vector.classifier>
     <forkCount>2</forkCount>
     <checkstyle.failOnViolation>true</checkstyle.failOnViolation>
   </properties>
@@ -486,10 +487,10 @@
 
   <dependencyManagement>
     <dependencies>
-      <dependency>
-        <groupId>com.google.flatbuffers</groupId>
-        <artifactId>flatbuffers-java</artifactId>
-        <version>${dep.fbs.version}</version>
+      <dependency>	
+        <groupId>com.google.flatbuffers</groupId>	
+        <artifactId>flatbuffers-java</artifactId>	
+        <version>${dep.fbs.version}</version>	
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>
@@ -679,6 +680,14 @@
         <module>gandiva</module>
       </modules>
     </profile>
+
+    <profile>
+      <!-- Use the version of arrow-vector that shades flatbuffers and packages format -->
+      <id>shade-flatbuffers</id>
+      <properties>
+       <arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
+      </properties>
+    </profile>
   </profiles>
 
 </project>
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
index bcdeb11..caa02c8 100644
--- a/java/tools/pom.xml
+++ b/java/tools/pom.xml
@@ -29,6 +29,7 @@
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-vector</artifactId>
             <version>${project.version}</version>
+            <classifier>${arrow.vector.classifier}</classifier>
         </dependency>
         <dependency>
           <groupId>com.google.guava</groupId>
diff --git a/java/vector/pom.xml b/java/vector/pom.xml
index 978411e..74ecf38 100644
--- a/java/vector/pom.xml
+++ b/java/vector/pom.xml
@@ -55,6 +55,7 @@
     <dependency>
       <groupId>com.google.flatbuffers</groupId>
       <artifactId>flatbuffers-java</artifactId>
+      <version>${dep.fbs.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
@@ -128,6 +129,37 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+       <groupId>org.apache.maven.plugins</groupId>
+       <artifactId>maven-shade-plugin</artifactId>
+       <version>3.1.1</version>
+       <executions>
+         <execution>
+           <phase>package</phase>
+           <goals>
+             <goal>shade</goal>
+           </goals>
+           <configuration>
+             <artifactSet>
+               <includes>
+                 <include>org.apache.arrow:arrow-format</include>
+                 <include>com.google.flatbuffers:*</include>
+               </includes>
+             </artifactSet>
+             <shadedArtifactAttached>true</shadedArtifactAttached>
+             <shadedClassifierName>shade-format-flatbuffers</shadedClassifierName>
+             <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
+             <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+             <relocations>
+               <relocation>
+                 <pattern>com.google.flatbuffers</pattern>
+                 <shadedPattern>arrow.vector.com.google.flatbuffers</shadedPattern>
+               </relocation>
+             </relocations>
+           </configuration>
+         </execution>
+       </executions>
+      </plugin>
     </plugins>
     <pluginManagement>
       <plugins>
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
index a466db8..93efb85 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
@@ -18,6 +18,7 @@
 package org.apache.arrow.vector.ipc.message;
 
 import org.apache.arrow.flatbuf.DictionaryBatch;
+import org.apache.arrow.flatbuf.MessageHeader;
 
 import com.google.flatbuffers.FlatBufferBuilder;
 
@@ -35,6 +36,10 @@ public class ArrowDictionaryBatch implements ArrowMessage {
     this.dictionary = dictionary;
   }
 
+  public byte getMessageType() {
+    return MessageHeader.DictionaryBatch;
+  }
+
   public long getDictionaryId() {
     return dictionaryId;
   }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
index 441f6ec..c928d03 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
@@ -26,6 +26,9 @@ public interface ArrowMessage extends FBSerializable, AutoCloseable {
 
   <T> T accepts(ArrowMessageVisitor<T> visitor);
 
+  /** Returns the flatbuffer enum value indicating the type of the message. */
+  byte getMessageType();
+
   /**
    * Visitor interface for implementations of {@link ArrowMessage}.
    *
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 185b44e..7631a87 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -106,6 +106,10 @@ public class ArrowRecordBatch implements ArrowMessage {
     this.buffersLayout = Collections.unmodifiableList(arrowBuffers);
   }
 
+  public byte getMessageType() {
+    return org.apache.arrow.flatbuf.MessageHeader.RecordBatch;
+  }
+
   public int getLength() {
     return length;
   }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
index 212f8b6..e472882 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
@@ -42,6 +42,15 @@ public class MessageMetadataResult {
     this.message = message;
   }
 
+  /**
+   * Creates a new {@link MessageMetadataResult} by parsing it from the beginning of the buffer.
+   *
+   * @param messageLength The length of the serialized flatbuffer message in bytes (might not be equal to the buffer
+   *     size).
+   */
+  public static MessageMetadataResult create(ByteBuffer buffer, int messageLength) {
+    return new MessageMetadataResult(messageLength, buffer, Message.getRootAsMessage(buffer));
+  }
 
   /**
    * Get the length of the message metadata in bytes, not including the body length.
@@ -55,13 +64,24 @@ public class MessageMetadataResult {
   /**
    * Get the buffer containing the raw message metadata bytes, not including the message body data.
    *
-   * @return buffer containing the message metadata
+   * @return buffer containing the message metadata.
    */
   public ByteBuffer getMessageBuffer() {
     return messageBuffer;
   }
 
   /**
+   * Returns the bytes remaining in the buffer after parsing the message from it.
+   */
+  public int bytesAfterMessage() {
+    return message.getByteBuffer().remaining();
+  }
+
+  public byte headerType() {
+    return message.headerType();
+  }
+
+  /**
    * Check if the message is followed by a body. This will be true if the message has a body
    * length > 0, which indicates that a message body needs to be read from the input source.
    *
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 034a34c..4016802 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -30,6 +30,7 @@ import org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.arrow.flatbuf.MetadataVersion;
 import org.apache.arrow.flatbuf.RecordBatch;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.ipc.ReadChannel;
 import org.apache.arrow.vector.ipc.WriteChannel;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -63,9 +64,9 @@ public class MessageSerializer {
    */
   public static int bytesToInt(byte[] bytes) {
     return ((bytes[3] & 255) << 24) +
-        ((bytes[2] & 255) << 16) +
-        ((bytes[1] & 255) << 8) +
-        ((bytes[0] & 255) << 0);
+           ((bytes[2] & 255) << 16) +
+           ((bytes[1] & 255) << 8) +
+           ((bytes[0] & 255));
   }
 
   /**
@@ -78,7 +79,7 @@ public class MessageSerializer {
     bytes[3] = (byte) (value >>> 24);
     bytes[2] = (byte) (value >>> 16);
     bytes[1] = (byte) (value >>> 8);
-    bytes[0] = (byte) (value >>> 0);
+    bytes[0] = (byte) (value);
   }
 
   /**
@@ -120,9 +121,7 @@ public class MessageSerializer {
     long start = out.getCurrentPosition();
     assert start % 8 == 0;
 
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int schemaOffset = schema.getSchema(builder);
-    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0);
+    ByteBuffer serializedMessage = serializeMetadata(schema);
 
     int messageLength = serializedMessage.remaining();
 
@@ -132,14 +131,25 @@ public class MessageSerializer {
   }
 
   /**
+   * Returns the serialized flatbuffer bytes of the schema wrapped in a message table.
+   */
+  public static ByteBuffer serializeMetadata(Schema schema) {
+    FlatBufferBuilder builder = new FlatBufferBuilder();
+    int schemaOffset = schema.getSchema(builder);
+    return MessageSerializer.serializeMessage(builder, org.apache.arrow.flatbuf.MessageHeader.Schema, schemaOffset, 0);
+  }
+
+  /**
    * Deserializes an Arrow Schema object from a schema message. Format is from serialize().
    *
    * @param schemaMessage a Message of type MessageHeader.Schema
    * @return the deserialized Arrow Schema
    */
   public static Schema deserializeSchema(Message schemaMessage) {
+    Preconditions.checkArgument(schemaMessage.headerType() == MessageHeader.Schema,
+        "Expected schema but result was:  %s", schemaMessage.headerType());
     return Schema.convertSchema((org.apache.arrow.flatbuf.Schema)
-      schemaMessage.header(new org.apache.arrow.flatbuf.Schema()));
+        schemaMessage.header(new org.apache.arrow.flatbuf.Schema()));
   }
 
   /**
@@ -157,8 +167,17 @@ public class MessageSerializer {
     if (result.getMessage().headerType() != MessageHeader.Schema) {
       throw new IOException("Expected schema but header was " + result.getMessage().headerType());
     }
+    return deserializeSchema(result);
+  }
 
-    return deserializeSchema(result.getMessage());
+  /**
+   * Deserializes an Arrow Schema object from a {@link MessageMetadataResult}. Format is from serialize().
+   *
+   * @param message a Message of type MessageHeader.Schema
+   * @return the deserialized Arrow Schema
+   */
+  public static Schema deserializeSchema(MessageMetadataResult message) {
+    return deserializeSchema(message.getMessage());
   }
 
   /**
@@ -175,10 +194,7 @@ public class MessageSerializer {
     int bodyLength = batch.computeBodyLength();
     assert bodyLength % 8 == 0;
 
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int batchOffset = batch.writeTo(builder);
-
-    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch, batchOffset, bodyLength);
+    ByteBuffer serializedMessage = serializeMetadata(batch);
 
     int metadataLength = serializedMessage.remaining();
 
@@ -224,7 +240,7 @@ public class MessageSerializer {
       out.write(buffer);
       if (out.getCurrentPosition() != startPosition + layout.getSize()) {
         throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() +
-            " != " + startPosition + layout.getSize());
+                                        " != " + startPosition + layout.getSize());
       }
     }
     out.align();
@@ -232,6 +248,16 @@ public class MessageSerializer {
   }
 
   /**
+   * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}.
+   */
+  public static ByteBuffer serializeMetadata(ArrowMessage message) {
+    FlatBufferBuilder builder = new FlatBufferBuilder();
+    int batchOffset = message.writeTo(builder);
+    return serializeMessage(builder, message.getMessageType(), batchOffset,
+            message.computeBodyLength());
+  }
+
+  /**
    * Deserializes an ArrowRecordBatch from a record batch message and data in an ArrowBuf.
    *
    * @param recordBatchMessage a Message of type MessageHeader.RecordBatch
@@ -321,7 +347,7 @@ public class MessageSerializer {
       if ((int) node.length() != node.length() ||
           (int) node.nullCount() != node.nullCount()) {
         throw new IOException("Cannot currently deserialize record batches with " +
-            "node length larger than Int.MAX_VALUE");
+                              "node length larger than Int.MAX_VALUE");
       }
       nodes.add(new ArrowFieldNode((int) node.length(), (int) node.nullCount()));
     }
@@ -341,6 +367,15 @@ public class MessageSerializer {
   }
 
   /**
+   * Reads a record batch based on the metadata in serializedMessage and the underlying data buffer.
+   */
+  public static ArrowRecordBatch deserializeRecordBatch(MessageMetadataResult serializedMessage,
+      ArrowBuf underlying) throws
+      IOException {
+    return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
+  }
+
+  /**
    * Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
    *
    * @param out   where to serialize
@@ -353,10 +388,7 @@ public class MessageSerializer {
     int bodyLength = batch.computeBodyLength();
     assert bodyLength % 8 == 0;
 
-    FlatBufferBuilder builder = new FlatBufferBuilder();
-    int batchOffset = batch.writeTo(builder);
-
-    ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.DictionaryBatch, batchOffset, bodyLength);
+    ByteBuffer serializedMessage = serializeMetadata(batch);
 
     int metadataLength = serializedMessage.remaining();
 
@@ -397,6 +429,20 @@ public class MessageSerializer {
   }
 
   /**
+   * Deserializes an ArrowDictionaryBatch from a dictionary batch Message and data in an ArrowBuf.
+   *
+   * @param message a message of type MessageHeader.DictionaryBatch
+   * @param bodyBuffer Arrow buffer containing the DictionaryBatch data
+   *                   of type MessageHeader.DictionaryBatch
+   * @return the deserialized ArrowDictionaryBatch
+   * @throws IOException if something went wrong
+   */
+  public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageMetadataResult message, ArrowBuf bodyBuffer)
+      throws IOException {
+    return deserializeDictionaryBatch(message.getMessage(), bodyBuffer);
+  }
+
+  /**
    * Deserializes an ArrowDictionaryBatch read from the input channel. This uses the given allocator
    * to create an ArrowBuf for the batch body data.
    *
@@ -547,7 +593,7 @@ public class MessageSerializer {
         ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength);
         if (in.readFully(messageBuffer) != messageLength) {
           throw new IOException(
-            "Unexpected end of stream trying to read message.");
+              "Unexpected end of stream trying to read message.");
         }
         messageBuffer.rewind();
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
similarity index 63%
copy from java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
copy to java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
index 441f6ec..ee53615 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
@@ -17,23 +17,20 @@
 
 package org.apache.arrow.vector.ipc.message;
 
-/**
- * Interface for Arrow IPC messages (https://arrow.apache.org/docs/format/IPC.html).
- */
-public interface ArrowMessage extends FBSerializable, AutoCloseable {
+import static org.junit.Assert.assertEquals;
 
-  int computeBodyLength();
+import java.nio.ByteBuffer;
 
-  <T> T accepts(ArrowMessageVisitor<T> visitor);
+import org.junit.Test;
 
-  /**
-   * Visitor interface for implementations of {@link ArrowMessage}.
-   *
-   * @param <T> The type of value to return after visiting.
-   */
-  interface ArrowMessageVisitor<T> {
-    T visit(ArrowDictionaryBatch message);
+public class TestMessageMetadataResult {
 
-    T visit(ArrowRecordBatch message);
+  @Test
+  public void getMessageLength_returnsConstructValue() {
+    // This API is used by spark.
+    MessageMetadataResult result = new MessageMetadataResult(1, ByteBuffer.allocate(0),
+        new org.apache.arrow.flatbuf.Message());
+    assertEquals(result.getMessageLength(), 1);
   }
+
 }