You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by cp...@apache.org on 2018/07/30 21:38:41 UTC

[arrow] branch master updated: ARROW-2937: [Java] Followup to ARROW-2704. Make MessageReader classes immutable and clarify docs

This is an automated email from the ASF dual-hosted git repository.

cpcloud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 734828d  ARROW-2937: [Java] Followup to ARROW-2704. Make MessageReader classes immutable and clarify docs
734828d is described below

commit 734828d3144be7d06858118519ec957d84483e94
Author: Bryan Cutler <cu...@gmail.com>
AuthorDate: Mon Jul 30 17:38:23 2018 -0400

    ARROW-2937: [Java] Followup to ARROW-2704. Make MessageReader classes immutable and clarify docs
    
    Changed MessageReader to return an immutable MessageResult, fixed up API wording to better describe the Message metadata as used in related classes and functions, and distinguish from actual Message data.
    
    Author: Bryan Cutler <cu...@gmail.com>
    
    Closes #2340 from BryanCutler/java-MessageReader-followup-ARROW-2704 and squashes the following commits:
    
    a146c315 <Bryan Cutler> made MessageResult constructor package private
    cbb9c7de <Bryan Cutler> changed readMessage to return null if no valid message was read
    3e1825a1 <Bryan Cutler> made message variables final and updated docs
    ae8fbe6c <Bryan Cutler> fixed spacing and alignment
    d7cc7280 <Bryan Cutler> Changed MessageResult to be immutable, fixed up API wording and docs to better describe metadata
---
 .../apache/arrow/vector/ipc/ArrowStreamReader.java |  45 +++++----
 .../vector/ipc/message/MessageChannelReader.java   |  25 +++--
 .../vector/ipc/message/MessageChannelResult.java   | 104 ---------------------
 .../arrow/vector/ipc/message/MessageHolder.java    |  30 ------
 .../vector/ipc/message/MessageMetadataResult.java  |  96 +++++++++++++++++++
 .../arrow/vector/ipc/message/MessageResult.java    |  62 ++++++++++++
 .../vector/ipc/message/MessageSerializer.java      |  66 ++++++-------
 .../arrow/vector/ipc/MessageSerializerTest.java    |   4 +-
 8 files changed, 233 insertions(+), 199 deletions(-)

diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
index 74c6074..97bc2c1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
@@ -23,12 +23,13 @@ import java.io.InputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 
+import io.netty.buffer.ArrowBuf;
 import org.apache.arrow.flatbuf.MessageHeader;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.ipc.message.MessageChannelReader;
-import org.apache.arrow.vector.ipc.message.MessageHolder;
+import org.apache.arrow.vector.ipc.message.MessageResult;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
 import org.apache.arrow.vector.types.pojo.Schema;
 
@@ -98,23 +99,25 @@ public class ArrowStreamReader extends ArrowReader {
    */
   public boolean loadNextBatch() throws IOException {
     prepareLoadNextBatch();
-    MessageHolder holder = new MessageHolder();
+    MessageResult result = messageReader.readNext();
 
     // Reached EOS
-    if (!messageReader.readNext(holder)) {
+    if (result == null) {
       return false;
     }
 
-    if (holder.message.headerType() != MessageHeader.RecordBatch) {
-      throw new IOException("Expected RecordBatch but header was " + holder.message.headerType());
+    if (result.getMessage().headerType() != MessageHeader.RecordBatch) {
+      throw new IOException("Expected RecordBatch but header was " + result.getMessage().headerType());
     }
 
+    ArrowBuf bodyBuffer = result.getBodyBuffer();
+
     // For zero-length batches, need an empty buffer to deserialize the batch
-    if (holder.bodyBuffer == null) {
-      holder.bodyBuffer = allocator.getEmpty();
+    if (bodyBuffer == null) {
+      bodyBuffer = allocator.getEmpty();
     }
 
-    ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(holder.message, holder.bodyBuffer);
+    ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(result.getMessage(), bodyBuffer);
     loadRecordBatch(batch);
     return true;
   }
@@ -126,17 +129,17 @@ public class ArrowStreamReader extends ArrowReader {
    */
   @Override
   protected Schema readSchema() throws IOException {
-    MessageHolder holder = new MessageHolder();
+    MessageResult result = messageReader.readNext();
 
-    if (!messageReader.readNext(holder)) {
+    if (result == null) {
       throw new IOException("Unexpected end of input. Missing schema.");
     }
 
-    if (holder.message.headerType() != MessageHeader.Schema) {
-      throw new IOException("Expected schema but header was " + holder.message.headerType());
+    if (result.getMessage().headerType() != MessageHeader.Schema) {
+      throw new IOException("Expected schema but header was " + result.getMessage().headerType());
     }
 
-    return MessageSerializer.deserializeSchema(holder.message);
+    return MessageSerializer.deserializeSchema(result.getMessage());
   }
 
   /**
@@ -148,21 +151,23 @@ public class ArrowStreamReader extends ArrowReader {
    */
   @Override
   protected ArrowDictionaryBatch readDictionary() throws IOException {
-    MessageHolder holder = new MessageHolder();
+    MessageResult result = messageReader.readNext();
 
-    if (!messageReader.readNext(holder)) {
+    if (result == null) {
       throw new IOException("Unexpected end of input. Expected DictionaryBatch");
     }
 
-    if (holder.message.headerType() != MessageHeader.DictionaryBatch) {
-      throw new IOException("Expected DictionaryBatch but header was " + holder.message.headerType());
+    if (result.getMessage().headerType() != MessageHeader.DictionaryBatch) {
+      throw new IOException("Expected DictionaryBatch but header was " + result.getMessage().headerType());
     }
 
+    ArrowBuf bodyBuffer = result.getBodyBuffer();
+
     // For zero-length batches, need an empty buffer to deserialize the batch
-    if (holder.bodyBuffer == null) {
-      holder.bodyBuffer = allocator.getEmpty();
+    if (bodyBuffer == null) {
+      bodyBuffer = allocator.getEmpty();
     }
 
-    return MessageSerializer.deserializeDictionaryBatch(holder.message, holder.bodyBuffer);
+    return MessageSerializer.deserializeDictionaryBatch(result.getMessage(), bodyBuffer);
   }
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
index 399f225..abbe603 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java
@@ -20,6 +20,8 @@ package org.apache.arrow.vector.ipc.message;
 
 import java.io.IOException;
 
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.ipc.ReadChannel;
 
@@ -42,28 +44,31 @@ public class MessageChannelReader implements AutoCloseable {
   }
 
   /**
-   * Read a Message from the ReadChannel and populate holder if a valid message was read.
+   * Read a message from the ReadChannel and return a MessageResult containing the Message
+   * metadata and optional message body data. Once the end-of-stream has been reached, a null
+   * value will be returned. If the message has no body, then MessageResult.getBodyBuffer()
+   * returns null.
    *
-   * @param holder Message and message information that is populated when read by implementation
-   * @return true if a valid Message was read, false if end-of-stream
+   * @return MessageResult or null if reached end-of-stream
    * @throws IOException
    */
-  public boolean readNext(MessageHolder holder) throws IOException {
+  public MessageResult readNext() throws IOException {
 
     // Read the flatbuf message and check for end-of-stream
-    MessageChannelResult result = MessageSerializer.readMessage(in);
-    if (!result.hasMessage()) {
-      return false;
+    MessageMetadataResult result = MessageSerializer.readMessage(in);
+    if (result == null) {
+      return null;
     }
-    holder.message = result.getMessage();
+    Message message = result.getMessage();
+    ArrowBuf bodyBuffer = null;
 
     // Read message body data if defined in message
     if (result.messageHasBody()) {
       int bodyLength = (int) result.getMessageBodyLength();
-      holder.bodyBuffer = MessageSerializer.readMessageBody(in, bodyLength, allocator);
+      bodyBuffer = MessageSerializer.readMessageBody(in, bodyLength, allocator);
     }
 
-    return true;
+    return new MessageResult(message, bodyBuffer);
   }
 
   /**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelResult.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelResult.java
deleted file mode 100644
index 0b732f2..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelResult.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.arrow.vector.ipc.message;
-
-import java.nio.ByteBuffer;
-
-import org.apache.arrow.flatbuf.Message;
-
-/**
-* Class to hold resulting Message and message information when reading messages from a ReadChannel.
-*/
-public class MessageChannelResult {
-
-  /**
-  * Construct a container to hold a message result.
-  *
-  * @param messageLength the length of the message read in bytes
-  * @param messageBuffer contains the raw bytes of the message
-  * @param message the realized flatbuf Message
-  */
-  public MessageChannelResult(int messageLength, ByteBuffer messageBuffer, Message message) {
-      this.messageLength = messageLength;
-      this.messageBuffer = messageBuffer;
-      this.message = message;
-    }
-
-  /**
-  * Returns status indicating if the MessageResult has a valid message.
-  *
-  * @return true if the result contains a valid message
-  */
-  public boolean hasMessage() {
-    return message != null;
-  }
-
-  /**
-  * Get the length of the message in bytes.
-  *
-  * @return number of bytes in the message buffer.
-  */
-  public int getMessageLength() {
-    return messageLength;
-  }
-
-  /**
-  * Get the buffer containing the raw message bytes.
-  *
-  * @return buffer containing the message
-  */
-  public ByteBuffer getMessageBuffer() {
-    return messageBuffer;
-  }
-
-  /**
-   * Check if the message is valid and is followed by a body.
-   *
-   * @return true if message has a body
-   */
-  public boolean messageHasBody() {
-    return message != null && message.bodyLength() > 0;
-  }
-
-  /**
-   * Get the length of the message body.
-   *
-   * @return number of bytes of the message body
-   */
-  public long getMessageBodyLength() {
-    long bodyLength = 0;
-    if (message != null) {
-      bodyLength = message.bodyLength();
-    }
-    return bodyLength;
-  }
-
-  /**
-  * Get the realized flatbuf Message.
-  *
-  * @return Message
-  */
-  public Message getMessage() {
-    return message;
-  }
-
-  private int messageLength;
-  private ByteBuffer messageBuffer;
-  private Message message;
-}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageHolder.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageHolder.java
deleted file mode 100644
index 975a9af..0000000
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageHolder.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.vector.ipc.message;
-
-import io.netty.buffer.ArrowBuf;
-import org.apache.arrow.flatbuf.Message;
-
-/**
- * Class to hold a Message and body when reading messages through a MessageChannelReader.
- */
-public class MessageHolder {
-  public Message message;
-  public ArrowBuf bodyBuffer;
-}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
new file mode 100644
index 0000000..c1f16df
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.Message;
+
+/**
+ * Class to hold resulting Message metadata and buffer containing the serialized Flatbuffer
+ * message when reading messages from a ReadChannel. This handles Message metadata only and
+ * does not include the message body data, which should be subsequently read into an ArrowBuf.
+ */
+public class MessageMetadataResult {
+
+  /**
+   * Construct a container to hold a deserialized Message metadata, and buffer
+   * with the serialized Message as read from a ReadChannel.
+   *
+   * @param messageLength the length of the serialized Flatbuffer message in bytes
+   * @param messageBuffer contains the serialized Flatbuffer Message metadata
+   * @param message the deserialized Flatbuffer Message metadata description
+   */
+  MessageMetadataResult(int messageLength, ByteBuffer messageBuffer, Message message) {
+    this.messageLength = messageLength;
+    this.messageBuffer = messageBuffer;
+    this.message = message;
+  }
+
+
+  /**
+   * Get the length of the message metadata in bytes, not including the body length.
+   *
+   * @return number of bytes in the message metadata buffer.
+   */
+  public int getMessageLength() {
+    return messageLength;
+  }
+
+  /**
+   * Get the buffer containing the raw message metadata bytes, not including the message body data.
+   *
+   * @return buffer containing the message metadata
+   */
+  public ByteBuffer getMessageBuffer() {
+    return messageBuffer;
+  }
+
+  /**
+   * Check if the message is followed by a body. This will be true if the message has a body
+   * length > 0, which indicates that a message body needs to be read from the input source.
+   *
+   * @return true if message has a defined body
+   */
+  public boolean messageHasBody() {
+    return message.bodyLength() > 0;
+  }
+
+  /**
+   * Get the length of the message body.
+   *
+   * @return number of bytes of the message body
+   */
+  public long getMessageBodyLength() {
+    return message.bodyLength();
+  }
+
+  /**
+   * Get the realized flatbuf Message metadata description.
+   *
+   * @return Message metadata
+   */
+  public Message getMessage() {
+    return message;
+  }
+
+  private final int messageLength;
+  private final ByteBuffer messageBuffer;
+  private final Message message;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageResult.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageResult.java
new file mode 100644
index 0000000..164821f
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageResult.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+import io.netty.buffer.ArrowBuf;
+import org.apache.arrow.flatbuf.Message;
+
+/**
+ * Class to hold the Message metadata and body data when reading messages through a
+ * MessageChannelReader.
+ */
+public class MessageResult {
+
+  /**
+   * Construct with a valid Message metadata and optional ArrowBuf containing message body
+   * data, if any.
+   *
+   * @param message Deserialized Flatbuffer Message metadata description
+   * @param bodyBuffer Optional ArrowBuf containing message body data, null if message has no body
+   */
+  MessageResult(Message message, ArrowBuf bodyBuffer) {
+    this.message = message;
+    this.bodyBuffer = bodyBuffer;
+  }
+
+  /**
+   * Get the Message metadata.
+   *
+   * @return the Flatbuffer Message metadata
+   */
+  public Message getMessage() {
+    return message;
+  }
+
+  /**
+   * Get the message body data.
+   *
+   * @return an ArrowBuf containing the message body data or null if the message has no body
+   */
+  public ArrowBuf getBodyBuffer() {
+    return bodyBuffer;
+  }
+
+  private final Message message;
+  private final ArrowBuf bodyBuffer;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 7371991..5aace0c 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -83,17 +83,18 @@ public class MessageSerializer {
   }
 
   /**
-   * Aligns the message to 8 byte boundary and adjusts messageLength accordingly, then writes
-   * the message length prefix and message buffer to the Channel.
+   * Write the serialized Message metadata, prefixed by the length, to the output Channel. This
+   * ensures that it aligns to an 8 byte boundary and will adjust the message length to include
+   * any padding used for alignment.
    *
    * @param out Output Channel
    * @param messageLength Number of bytes in the message buffer, written as little Endian prefix
-   * @param messageBuffer Message buffer to be written
+   * @param messageBuffer Message metadata buffer to be written, this does not include any
+   *                      message body data which should be subsequently written to the Channel
    * @return Number of bytes written
-   * @return
    * @throws IOException
    */
-  public static int writeMessageBufferAligned(WriteChannel out, int messageLength, ByteBuffer messageBuffer) throws IOException {
+  public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer) throws IOException {
 
     // ensure that message aligns to 8 byte padding - 4 bytes for size, then message body
     if ((messageLength + 4) % 8 != 0) {
@@ -125,7 +126,7 @@ public class MessageSerializer {
 
     int messageLength = serializedMessage.remaining();
 
-    int bytesWritten = writeMessageBufferAligned(out, messageLength, serializedMessage);
+    int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage);
     assert bytesWritten % 8 == 0;
     return bytesWritten;
   }
@@ -149,8 +150,8 @@ public class MessageSerializer {
    * @throws IOException if something went wrong
    */
   public static Schema deserializeSchema(ReadChannel in) throws IOException {
-    MessageChannelResult result = readMessage(in);
-    if (!result.hasMessage()) {
+    MessageMetadataResult result = readMessage(in);
+    if (result == null) {
       throw new IOException("Unexpected end of input when reading Schema");
     }
     if (result.getMessage().headerType() != MessageHeader.Schema) {
@@ -255,8 +256,8 @@ public class MessageSerializer {
    * @throws IOException
    */
   public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, BufferAllocator allocator) throws IOException {
-    MessageChannelResult result = readMessage(in);
-    if (!result.hasMessage()) {
+    MessageMetadataResult result = readMessage(in);
+    if (result == null) {
       throw new IOException("Unexpected end of input when reading a RecordBatch");
     }
     if (result.getMessage().headerType() != MessageHeader.RecordBatch) {
@@ -406,8 +407,8 @@ public class MessageSerializer {
    * @throws IOException
    */
   public static ArrowDictionaryBatch deserializeDictionaryBatch(ReadChannel in, BufferAllocator allocator) throws IOException {
-    MessageChannelResult result = readMessage(in);
-    if (!result.hasMessage()) {
+    MessageMetadataResult result = readMessage(in);
+    if (result == null) {
       throw new IOException("Unexpected end of input when reading a DictionaryBatch");
     }
     if (result.getMessage().headerType() != MessageHeader.DictionaryBatch) {
@@ -465,24 +466,24 @@ public class MessageSerializer {
    * @throws IOException if the message is not an ArrowDictionaryBatch or ArrowRecordBatch
    */
   public static ArrowMessage deserializeMessageBatch(MessageChannelReader reader) throws IOException {
-    MessageHolder holder = new MessageHolder();
-    if (!reader.readNext(holder)) {
+    MessageResult result = reader.readNext();
+    if (result == null) {
       return null;
-    } else if (holder.message.bodyLength() > Integer.MAX_VALUE) {
+    } else if (result.getMessage().bodyLength() > Integer.MAX_VALUE) {
       throw new IOException("Cannot currently deserialize record batches over 2GB");
     }
 
-    if (holder.message.version() != MetadataVersion.V4) {
+    if (result.getMessage().version() != MetadataVersion.V4) {
       throw new IOException("Received metadata with an incompatible version number");
     }
 
-    switch (holder.message.headerType()) {
+    switch (result.getMessage().headerType()) {
       case MessageHeader.RecordBatch:
-        return deserializeRecordBatch(holder.message, holder.bodyBuffer);
+        return deserializeRecordBatch(result.getMessage(), result.getBodyBuffer());
       case MessageHeader.DictionaryBatch:
-        return deserializeDictionaryBatch(holder.message, holder.bodyBuffer);
+        return deserializeDictionaryBatch(result.getMessage(), result.getBodyBuffer());
       default:
-        throw new IOException("Unexpected message header type " + holder.message.headerType());
+        throw new IOException("Unexpected message header type " + result.getMessage().headerType());
     }
   }
 
@@ -519,29 +520,27 @@ public class MessageSerializer {
   }
 
   /**
-   * Read a Message from the in channel and return a MessageResult object that contains the
-   * Message, raw buffer containing the read Message, and length of the Message in bytes. If
-   * the end-of-stream has been reached, MessageResult.hasMessage() will return false.
+   * Read a Message from the input channel and return a MessageMetadataResult that contains the
+   * Message metadata, buffer containing the serialized Message metadata as read, and length of the
+   * Message in bytes. Returns null if the end-of-stream has been reached.
    *
    * @param in ReadChannel to read messages from
-   * @return MessageResult with Message and message information
+   * @return MessageMetadataResult with deserialized Message metadata and message information if
+   * a valid Message was read, or null if end-of-stream
    * @throws IOException
    */
-  public static MessageChannelResult readMessage(ReadChannel in) throws IOException {
-    int messageLength = 0;
-    ByteBuffer messageBuffer = null;
-    Message message = null;
+  public static MessageMetadataResult readMessage(ReadChannel in) throws IOException {
 
     // Read the message size. There is an i32 little endian prefix.
     ByteBuffer buffer = ByteBuffer.allocate(4);
     if (in.readFully(buffer) == 4) {
-      messageLength = MessageSerializer.bytesToInt(buffer.array());
+      int messageLength = MessageSerializer.bytesToInt(buffer.array());
 
       // Length of 0 indicates end of stream
       if (messageLength != 0) {
 
         // Read the message into the buffer.
-        messageBuffer = ByteBuffer.allocate(messageLength);
+        ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength);
         if (in.readFully(messageBuffer) != messageLength) {
           throw new IOException(
             "Unexpected end of stream trying to read message.");
@@ -549,11 +548,12 @@ public class MessageSerializer {
         messageBuffer.rewind();
 
         // Load the message.
-        message = Message.getRootAsMessage(messageBuffer);
+        Message message = Message.getRootAsMessage(messageBuffer);
+
+        return new MessageMetadataResult(messageLength, messageBuffer, message);
       }
     }
-
-    return new MessageChannelResult(messageLength, messageBuffer, message);
+    return null;
   }
 
   /**
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index f677c3d..9d3a0bb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -88,13 +88,13 @@ public class MessageSerializerTest {
     buffer.putInt(2);
     buffer.flip();
 
-    int bytesWritten = MessageSerializer.writeMessageBufferAligned(out, 8, buffer);
+    int bytesWritten = MessageSerializer.writeMessageBuffer(out, 8, buffer);
     assertEquals(16, bytesWritten);
 
     buffer.rewind();
     buffer.putInt(3);
     buffer.flip();
-    bytesWritten = MessageSerializer.writeMessageBufferAligned(out, 4, buffer);
+    bytesWritten = MessageSerializer.writeMessageBuffer(out, 4, buffer);
     assertEquals(8, bytesWritten);
 
     ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());