You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/09/11 22:07:36 UTC
[arrow] 03/03: ARROW-6315: [Java] Make change to ensure flatbuffer
reads are aligned
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch ARROW-6313-flatbuffer-alignment
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 0352456387e0d02036029de4fbb6d49324eb779e
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Fri Sep 6 20:36:38 2019 -0700
ARROW-6315: [Java] Make change to ensure flatbuffer reads are aligned
Implements the IPC message format alignment changes for [ARROW-6315](https://issues.apache.org/jira/browse/ARROW-6315).
i. MessageReader can read messages with the old alignment
ii. ArrowWriter could choose produces messages with the new alignment or the old format.
Closes #5229 from tianchen92/ARROW-align-java and squashes the following commits:
1eb71d27c <Bryan Cutler> ARROW-6461: Prevent EchoServer from closing the client socket after writing
cd4fd050e <tianchen> fix small bugs
9a690e47d <tianchen> fix comments and styles
5ee858c56 <tianchen> Make change to ensure flatbuffer reads are aligned
Lead-authored-by: tianchen <ni...@alibaba-inc.com>
Co-authored-by: Bryan Cutler <cu...@gmail.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../org/apache/arrow/tools/EchoServerTest.java | 2 +-
.../apache/arrow/vector/ipc/ArrowFileWriter.java | 13 ++-
.../apache/arrow/vector/ipc/ArrowStreamWriter.java | 22 ++++-
.../org/apache/arrow/vector/ipc/ArrowWriter.java | 17 +++-
.../org/apache/arrow/vector/ipc/WriteChannel.java | 9 ++
.../apache/arrow/vector/ipc/message/IpcOption.java | 28 ++++++
.../vector/ipc/message/MessageSerializer.java | 108 ++++++++++++++++++---
.../arrow/vector/ipc/MessageSerializerTest.java | 14 +--
.../arrow/vector/ipc/TestArrowReaderWriter.java | 52 +++++++++-
.../apache/arrow/vector/ipc/TestArrowStream.java | 2 +-
.../arrow/vector/ipc/TestArrowStreamPipe.java | 2 +-
11 files changed, 231 insertions(+), 38 deletions(-)
diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
index 219926a..bfb136c 100644
--- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
+++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java
@@ -127,7 +127,7 @@ public class EchoServerTest {
}
Assert.assertFalse(reader.loadNextBatch());
assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
- assertEquals(reader.bytesRead(), writer.bytesWritten());
+ assertEquals(reader.bytesRead() + 4, writer.bytesWritten());
}
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 395a617..936ab6d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -29,6 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFooter;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,11 @@ public class ArrowFileWriter extends ArrowWriter {
super(root, provider, out);
}
+ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ IpcOption option) {
+ super(root, provider, out, option);
+ }
+
@Override
protected void startInternal(WriteChannel out) throws IOException {
ArrowMagic.writeMagic(out, true);
@@ -68,7 +74,12 @@ public class ArrowFileWriter extends ArrowWriter {
@Override
protected void endInternal(WriteChannel out) throws IOException {
- out.writeIntLittleEndian(0);
+ if (option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(0);
+ } else {
+ out.writeLongLittleEndian(0);
+ }
+
long footerStart = out.getCurrentPosition();
out.write(new ArrowFooter(schema, dictionaryBlocks, recordBlocks), false);
int footerLength = (int) (out.getCurrentPosition() - footerStart);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index ec0f42e..e74323b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.message.IpcOption;
/**
* Writer for the Arrow stream format to send ArrowRecordBatches over a WriteChannel.
@@ -44,14 +45,23 @@ public class ArrowStreamWriter extends ArrowWriter {
/**
* Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
+ */
+ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ this(root, provider, out, new IpcOption());
+ }
+
+ /**
+ * Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
*
* @param root Existing VectorSchemaRoot with vectors to be written.
* @param provider DictionaryProvider for any vectors that are dictionary encoded.
* (Optional, can be null)
+ * @param option IPC write options
* @param out WritableByteChannel for writing.
*/
- public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
- super(root, provider, out);
+ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ IpcOption option) {
+ super(root, provider, out, option);
}
/**
@@ -60,8 +70,12 @@ public class ArrowStreamWriter extends ArrowWriter {
* @param out Open WriteChannel with an active Arrow stream.
* @throws IOException on error
*/
- public static void writeEndOfStream(WriteChannel out) throws IOException {
- out.writeIntLittleEndian(0);
+ public void writeEndOfStream(WriteChannel out) throws IOException {
+ if (option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(0);
+ } else {
+ out.writeLongLittleEndian(0);
+ }
}
@Override
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 6366f2f..52ab3de 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -33,6 +33,7 @@ import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -59,16 +60,24 @@ public abstract class ArrowWriter implements AutoCloseable {
private boolean dictWritten = false;
+ protected IpcOption option;
+
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ this (root, provider, out, new IpcOption());
+ }
+
/**
* Note: fields are not closed when the writer is closed.
*
* @param root the vectors to write to the output
* @param provider where to find the dictionaries
* @param out the output where to write
+ * @param option IPC write options
*/
- protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
this.unloader = new VectorUnloader(root);
this.out = new WriteChannel(out);
+ this.option = option;
List<Field> fields = new ArrayList<>(root.getSchema().getFields().size());
Set<Long> dictionaryIdsUsed = new HashSet<>();
@@ -112,14 +121,14 @@ public abstract class ArrowWriter implements AutoCloseable {
}
protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
- ArrowBlock block = MessageSerializer.serialize(out, batch);
+ ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
}
protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
- ArrowBlock block = MessageSerializer.serialize(out, batch);
+ ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
return block;
@@ -140,7 +149,7 @@ public abstract class ArrowWriter implements AutoCloseable {
startInternal(out);
// write the schema - for file formats this is duplicated in the footer, but matches
// the streaming format
- MessageSerializer.serialize(out, schema);
+ MessageSerializer.serialize(out, schema, option);
}
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
index eef36d3..2d36c93 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/WriteChannel.java
@@ -102,6 +102,15 @@ public class WriteChannel implements AutoCloseable {
}
/**
+ * Writes <code>v</code> in little-endian format to the underlying channel.
+ */
+ public long writeLongLittleEndian(long v) throws IOException {
+ byte[] outBuffer = new byte[8];
+ MessageSerializer.longToBytes(v, outBuffer);
+ return write(outBuffer);
+ }
+
+ /**
* Writes the buffer to the underlying channel.
*/
public void write(ArrowBuf buffer) throws IOException {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
new file mode 100644
index 0000000..81a0603
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.ipc.message;
+
+/**
+ * IPC options, now only use for write.
+ */
+public class IpcOption {
+
+ // Write the pre-0.15.0 encapsulated IPC message format
+ // consisting of a 4-byte prefix instead of 8 byte
+ public boolean write_legacy_ipc_format = false;
+}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 4016802..34ea077 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -56,6 +56,9 @@ import io.netty.buffer.ArrowBuf;
*/
public class MessageSerializer {
+ // This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
+ public static final int IPC_CONTINUATION_TOKEN = -1;
+
/**
* Convert an array of 4 bytes to a little endian i32 value.
*
@@ -83,6 +86,28 @@ public class MessageSerializer {
}
/**
+ * Convert a long to a 8 byte array.
+ *
+ * @param value long value input
+ * @param bytes existing byte array with minimum length of 8 to contain the conversion output
+ */
+ public static void longToBytes(long value, byte[] bytes) {
+ bytes[7] = (byte) (value >>> 56);
+ bytes[6] = (byte) (value >>> 48);
+ bytes[5] = (byte) (value >>> 40);
+ bytes[4] = (byte) (value >>> 32);
+ bytes[3] = (byte) (value >>> 24);
+ bytes[2] = (byte) (value >>> 16);
+ bytes[1] = (byte) (value >>> 8);
+ bytes[0] = (byte) (value);
+ }
+
+ public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+ throws IOException {
+ return writeMessageBuffer(out, messageLength, messageBuffer, new IpcOption());
+ }
+
+ /**
* Write the serialized Message metadata, prefixed by the length, to the output Channel. This
* ensures that it aligns to an 8 byte boundary and will adjust the message length to include
* any padding used for alignment.
@@ -91,22 +116,36 @@ public class MessageSerializer {
* @param messageLength Number of bytes in the message buffer, written as little Endian prefix
* @param messageBuffer Message metadata buffer to be written, this does not include any
* message body data which should be subsequently written to the Channel
+ * @param option IPC write options
* @return Number of bytes written
* @throws IOException on error
*/
- public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
+ public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer, IpcOption option)
throws IOException {
- // ensure that message aligns to 8 byte padding - 4 bytes for size, then message body
- if ((messageLength + 4) % 8 != 0) {
- messageLength += 8 - (messageLength + 4) % 8;
+ // if write the pre-0.15.0 encapsulated IPC message format consisting of a 4-byte prefix instead of 8 byte
+ int prefixSize = option.write_legacy_ipc_format ? 4 : 8;
+
+ // ensure that message aligns to 8 byte padding - prefix_size bytes, then message body
+ if ((messageLength + prefixSize ) % 8 != 0) {
+ messageLength += 8 - (messageLength + prefixSize) % 8;
+ }
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
}
out.writeIntLittleEndian(messageLength);
out.write(messageBuffer);
out.align();
// any bytes written are already captured by our size modification above
- return messageLength + 4;
+ return messageLength + prefixSize;
+ }
+
+ /**
+ * Serialize a schema object.
+ */
+ public static long serialize(WriteChannel out, Schema schema) throws IOException {
+ return serialize(out, schema, new IpcOption());
}
/**
@@ -117,7 +156,7 @@ public class MessageSerializer {
* @return the number of bytes written
* @throws IOException if something went wrong
*/
- public static long serialize(WriteChannel out, Schema schema) throws IOException {
+ public static long serialize(WriteChannel out, Schema schema, IpcOption option) throws IOException {
long start = out.getCurrentPosition();
assert start % 8 == 0;
@@ -125,7 +164,7 @@ public class MessageSerializer {
int messageLength = serializedMessage.remaining();
- int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage);
+ int bytesWritten = writeMessageBuffer(out, messageLength, serializedMessage, option);
assert bytesWritten % 8 == 0;
return bytesWritten;
}
@@ -182,13 +221,20 @@ public class MessageSerializer {
/**
* Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
+ */
+ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+ return serialize(out, batch, new IpcOption());
+ }
+
+ /**
+ * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
*
* @param out where to write the batch
* @param batch the object to serialize to out
* @return the serialized block metadata
* @throws IOException if something went wrong
*/
- public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
+ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch, IpcOption option) throws IOException {
long start = out.getCurrentPosition();
int bodyLength = batch.computeBodyLength();
@@ -198,8 +244,14 @@ public class MessageSerializer {
int metadataLength = serializedMessage.remaining();
+ int prefixSize = 4;
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+ prefixSize = 8;
+ }
+
// calculate alignment bytes so that metadata length points to the correct location after alignment
- int padding = (int) ((start + metadataLength + 4) % 8);
+ int padding = (int) ((start + metadataLength + prefixSize) % 8);
if (padding != 0) {
metadataLength += (8 - padding);
}
@@ -214,7 +266,7 @@ public class MessageSerializer {
assert bufferLength % 8 == 0;
// Metadata size in the Block account for the size prefix
- return new ArrowBlock(start, metadataLength + 4, bufferLength);
+ return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
}
/**
@@ -305,7 +357,7 @@ public class MessageSerializer {
*/
public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block, BufferAllocator alloc)
throws IOException {
- // Metadata length contains integer prefix plus byte padding
+ // Metadata length contains prefix_size bytes plus byte padding
long totalLen = block.getMetadataLength() + block.getBodyLength();
if (totalLen > Integer.MAX_VALUE) {
@@ -317,7 +369,9 @@ public class MessageSerializer {
throw new IOException("Unexpected end of input trying to read batch.");
}
- ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+ int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+ ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
Message messageFB =
Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -375,15 +429,21 @@ public class MessageSerializer {
return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
}
+ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+ return serialize(out, batch, new IpcOption());
+ }
+
/**
* Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
*
* @param out where to serialize
* @param batch the batch to serialize
+ * @param option options for IPC
* @return the metadata of the serialized block
* @throws IOException if something went wrong
*/
- public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
+ public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch, IpcOption option)
+ throws IOException {
long start = out.getCurrentPosition();
int bodyLength = batch.computeBodyLength();
assert bodyLength % 8 == 0;
@@ -392,8 +452,14 @@ public class MessageSerializer {
int metadataLength = serializedMessage.remaining();
+ int prefixSize = 4;
+ if (!option.write_legacy_ipc_format) {
+ out.writeIntLittleEndian(IPC_CONTINUATION_TOKEN);
+ prefixSize = 8;
+ }
+
// calculate alignment bytes so that metadata length points to the correct location after alignment
- int padding = (int) ((start + metadataLength + 4) % 8);
+ int padding = (int) ((start + metadataLength + prefixSize) % 8);
if (padding != 0) {
metadataLength += (8 - padding);
}
@@ -409,7 +475,7 @@ public class MessageSerializer {
assert bufferLength % 8 == 0;
// Metadata size in the Block account for the size prefix
- return new ArrowBlock(start, metadataLength + 4, bufferLength);
+ return new ArrowBlock(start, metadataLength + prefixSize, bufferLength);
}
/**
@@ -491,7 +557,9 @@ public class MessageSerializer {
throw new IOException("Unexpected end of input trying to read batch.");
}
- ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+ int prefixSize = buffer.getInt(0) == IPC_CONTINUATION_TOKEN ? 8 : 4;
+
+ ArrowBuf metadataBuffer = buffer.slice(prefixSize, block.getMetadataLength() - prefixSize);
Message messageFB =
Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
@@ -584,7 +652,15 @@ public class MessageSerializer {
// Read the message size. There is an i32 little endian prefix.
ByteBuffer buffer = ByteBuffer.allocate(4);
if (in.readFully(buffer) == 4) {
+
int messageLength = MessageSerializer.bytesToInt(buffer.array());
+ if (messageLength == IPC_CONTINUATION_TOKEN) {
+ buffer.clear();
+ // ARROW-6313, if the first 4 bytes are continuation message, read the next 4 for the length
+ if (in.readFully(buffer) == 4) {
+ messageLength = MessageSerializer.bytesToInt(buffer.array());
+ }
+ }
// Length of 0 indicates end of stream
if (messageLength != 0) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index 789da1f..1cbd5bb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -95,7 +95,7 @@ public class MessageSerializerTest {
buffer.putInt(3);
buffer.flip();
bytesWritten = MessageSerializer.writeMessageBuffer(out, 4, buffer);
- assertEquals(8, bytesWritten);
+ assertEquals(16, bytesWritten);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
ReadChannel in = new ReadChannel(Channels.newChannel(inputStream));
@@ -103,15 +103,17 @@ public class MessageSerializerTest {
in.readFully(result);
result.rewind();
- // First message size, 2 int values, 4 bytes of zero padding
- assertEquals(12, result.getInt());
+ // First message continuation, size, and 2 int values
+ assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+ assertEquals(8, result.getInt());
assertEquals(1, result.getInt());
assertEquals(2, result.getInt());
- assertEquals(0, result.getInt());
- // Second message size and 1 int value
- assertEquals(4, result.getInt());
+ // Second message continuation, size, 1 int value and 4 bytes padding
+ assertEquals(MessageSerializer.IPC_CONTINUATION_TOKEN, result.getInt());
+ assertEquals(8, result.getInt());
assertEquals(3, result.getInt());
+ assertEquals(0, result.getInt());
}
@Test
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 5d1f792..58ad669 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -42,6 +43,7 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Collections2;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TestUtils;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
@@ -54,6 +56,7 @@ import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -75,7 +78,7 @@ public class TestArrowReaderWriter {
private Dictionary dictionary2;
private Schema schema;
- private Schema encodedchema;
+ private Schema encodedSchema;
@Before
public void init() {
@@ -161,7 +164,8 @@ public class TestArrowReaderWriter {
// deserialize the buffer.
ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength());
headerBuffer.put(byteArray, (int) recordBatches.get(0).getOffset(), headerBuffer.capacity());
- headerBuffer.position(4);
+ // new format prefix_size ==8
+ headerBuffer.position(8);
Message messageFB = Message.getRootAsMessage(headerBuffer);
RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
assertEquals(2, recordBatchFB.buffersLength());
@@ -335,7 +339,7 @@ public class TestArrowReaderWriter {
try (ArrowStreamReader reader = new ArrowStreamReader(
new ByteArrayReadableSeekableByteChannel(outStream.toByteArray()), allocator)) {
Schema readSchema = reader.getVectorSchemaRoot().getSchema();
- assertEquals(encodedchema, readSchema);
+ assertEquals(encodedSchema, readSchema);
assertEquals(2, reader.getDictionaryVectors().size());
assertTrue(reader.loadNextBatch());
assertTrue(reader.loadNextBatch());
@@ -401,8 +405,48 @@ public class TestArrowReaderWriter {
schemaFields.add(DictionaryUtility.toMessageFormat(encodedVectorA2.getField(), provider, new HashSet<>()));
schema = new Schema(schemaFields);
- encodedchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
+ encodedSchema = new Schema(Arrays.asList(encodedVectorA1.getField(), encodedVectorA2.getField()));
return batches;
}
+
+ @Test
+ public void testLegacyIpcBackwardsCompatibility() throws Exception {
+ Schema schema = new Schema(asList(Field.nullable("field", new ArrowType.Int(32, true))));
+ IntVector vector = new IntVector("vector", allocator);
+ final int valueCount = 2;
+ vector.setValueCount(valueCount);
+ vector.setSafe(0, 1);
+ vector.setSafe(1, 2);
+ ArrowRecordBatch batch = new ArrowRecordBatch(valueCount, asList(new ArrowFieldNode(valueCount, 0)),
+ asList(vector.getValidityBuffer(), vector.getDataBuffer()));
+
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ WriteChannel out = new WriteChannel(newChannel(outStream));
+
+ // write legacy ipc format
+ IpcOption option = new IpcOption();
+ option.write_legacy_ipc_format = true;
+ MessageSerializer.serialize(out, schema, option);
+ MessageSerializer.serialize(out, batch);
+
+ ReadChannel in = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+ Schema readSchema = MessageSerializer.deserializeSchema(in);
+ assertEquals(schema, readSchema);
+ ArrowRecordBatch readBatch = MessageSerializer.deserializeRecordBatch(in, allocator);
+ assertEquals(batch.getLength(), readBatch.getLength());
+ assertEquals(batch.computeBodyLength(), readBatch.computeBodyLength());
+
+ // write ipc format with continuation
+ option.write_legacy_ipc_format = false;
+ MessageSerializer.serialize(out, schema, option);
+ MessageSerializer.serialize(out, batch);
+
+ ReadChannel in2 = new ReadChannel(newChannel(new ByteArrayInputStream(outStream.toByteArray())));
+ Schema readSchema2 = MessageSerializer.deserializeSchema(in2);
+ assertEquals(schema, readSchema2);
+ ArrowRecordBatch readBatch2 = MessageSerializer.deserializeRecordBatch(in2, allocator);
+ assertEquals(batch.getLength(), readBatch2.getLength());
+ assertEquals(batch.computeBodyLength(), readBatch2.computeBodyLength());
+ }
}
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
index 92e5276..5d8f5df 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStream.java
@@ -117,7 +117,7 @@ public class TestArrowStream extends BaseFileTest {
assertTrue(reader.loadNextBatch());
}
// TODO figure out why reader isn't getting padding bytes
- assertEquals(bytesWritten, reader.bytesRead() + 4);
+ assertEquals(bytesWritten, reader.bytesRead() + 8);
assertFalse(reader.loadNextBatch());
assertEquals(0, reader.getVectorSchemaRoot().getRowCount());
}
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
index 422a63f..07f4017 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowStreamPipe.java
@@ -156,6 +156,6 @@ public class TestArrowStreamPipe {
writer.join();
assertEquals(NUM_BATCHES, reader.getBatchesRead());
- assertEquals(writer.bytesWritten(), reader.bytesRead());
+ assertEquals(writer.bytesWritten(), reader.bytesRead() + 4);
}
}