You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/20 22:56:29 UTC
arrow git commit: ARROW-499: Update file serialization to use the
streaming serialization format.
Repository: arrow
Updated Branches:
refs/heads/master 512bc160e -> 8ca7033fc
ARROW-499: Update file serialization to use the streaming serialization format.
Author: Wes McKinney <we...@twosigma.com>
Author: Nong Li <no...@gmail.com>
Closes #292 from nongli/file and squashes the following commits:
18890a9 [Wes McKinney] Message fixes. Fix Java test suite. Integration tests pass
f187539 [Nong Li] Merge pull request #1 from wesm/file-change-cpp-impl
e3af434 [Wes McKinney] Remove unused variable
664d5be [Wes McKinney] Fixes, stream tests pass again
ba8db91 [Wes McKinney] Redo MessageSerializer with unions. Still has bugs
21854cc [Wes McKinney] Restore Block.bodyLength to long
7c6f7ef [Nong Li] Update to restore Block behavior
27b3909 [Nong Li] [ARROW-499]: [Java] Update file serialization to use the streaming serialization format.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/8ca7033f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/8ca7033f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/8ca7033f
Branch: refs/heads/master
Commit: 8ca7033fcd3fcf377cb7924eae9be45b8f6ebd5d
Parents: 512bc16
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Jan 20 17:56:23 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Jan 20 17:56:23 2017 -0500
----------------------------------------------------------------------
cpp/src/arrow/ipc/adapter.cc | 11 +-
cpp/src/arrow/ipc/metadata-internal.cc | 21 +--
format/File.fbs | 5 +-
integration/integration_test.py | 2 +-
.../apache/arrow/vector/file/ArrowFooter.java | 5 +-
.../apache/arrow/vector/file/ArrowReader.java | 64 ++-----
.../apache/arrow/vector/file/ArrowWriter.java | 43 +----
.../apache/arrow/vector/file/ReadChannel.java | 11 +-
.../arrow/vector/stream/MessageSerializer.java | 169 ++++++++++++-------
.../apache/arrow/vector/file/TestArrowFile.java | 4 -
.../arrow/vector/file/TestArrowFooter.java | 8 +
.../vector/file/TestArrowReaderWriter.java | 16 ++
12 files changed, 174 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 2b5ef11..7b4d18c 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -129,13 +129,12 @@ class RecordBatchWriter : public ArrayVisitor {
num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
// Need to write 4 bytes (metadata size), the metadata, plus padding to
- // fall on a 64-byte offset
- int64_t padded_metadata_length =
- BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
+ // fall on an 8-byte offset
+ int64_t padded_metadata_length = BitUtil::CeilByte(metadata_fb->size() + 4);
// The returned metadata size includes the length prefix, the flatbuffer,
// plus padding
- *metadata_length = padded_metadata_length;
+ *metadata_length = static_cast<int32_t>(padded_metadata_length);
// Write the flatbuffer size prefix
int32_t flatbuffer_size = metadata_fb->size();
@@ -604,7 +603,9 @@ Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
return Status::Invalid(ss.str());
}
- *metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(Message::Open(buffer, 4, &message));
+ *metadata = std::make_shared<RecordBatchMetadata>(message);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 16069a8..cc160c4 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -320,23 +320,10 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
- flatbuffers::FlatBufferBuilder fbb;
-
- auto batch = flatbuf::CreateRecordBatch(
- fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
-
- fbb.Finish(batch);
-
- int32_t size = fbb.GetSize();
-
- auto result = std::make_shared<PoolBuffer>();
- RETURN_NOT_OK(result->Resize(size));
-
- uint8_t* dst = result->mutable_data();
- memcpy(dst, fbb.GetBufferPointer(), size);
-
- *out = result;
- return Status::OK();
+ MessageBuilder builder;
+ RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
+ RETURN_NOT_OK(builder.Finish());
+ return builder.GetBuffer(out);
}
Status MessageBuilder::Finish() {
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/format/File.fbs
----------------------------------------------------------------------
diff --git a/format/File.fbs b/format/File.fbs
index f28dc20..e8d6da4 100644
--- a/format/File.fbs
+++ b/format/File.fbs
@@ -35,12 +35,15 @@ table Footer {
struct Block {
+ /// Index to the start of the RecordBlock (note this is past the Message header)
offset: long;
+ /// Length of the metadata
metaDataLength: int;
+ /// Length of the data (this is aligned so there can be a gap between this and
+ /// the metatdata).
bodyLength: long;
-
}
root_type Footer;
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/integration/integration_test.py
----------------------------------------------------------------------
diff --git a/integration/integration_test.py b/integration/integration_test.py
index 417354b..77510da 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -648,7 +648,7 @@ def get_static_json_files():
def run_all_tests(debug=False):
- testers = [JavaTester(debug=debug), CPPTester(debug=debug)]
+ testers = [CPPTester(debug=debug), JavaTester(debug=debug)]
static_json_files = get_static_json_files()
generated_json_files = get_generated_json_files()
json_files = static_json_files + generated_json_files
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
index 3be1929..3890306 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowFooter.java
@@ -65,10 +65,11 @@ public class ArrowFooter implements FBSerializable {
private static List<ArrowBlock> dictionaries(Footer footer) {
List<ArrowBlock> dictionaries = new ArrayList<>();
- Block tempBLock = new Block();
+ Block tempBlock = new Block();
+
int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
- Block block = footer.dictionaries(tempBLock, i);
+ Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return dictionaries;
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
index 58c5160..8f4f497 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -20,23 +20,15 @@ package org.apache.arrow.vector.file;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
-import org.apache.arrow.flatbuf.Buffer;
-import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.Footer;
-import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ArrowBuf;
-
public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);
@@ -54,15 +46,6 @@ public class ArrowReader implements AutoCloseable {
this.allocator = allocator;
}
- private int readFully(ArrowBuf buffer, int l) throws IOException {
- int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
- buffer.writerIndex(n);
- if (n != l) {
- throw new IllegalStateException(n + " != " + l);
- }
- return n;
- }
-
private int readFully(ByteBuffer buffer) throws IOException {
int total = 0;
int n;
@@ -104,46 +87,21 @@ public class ArrowReader implements AutoCloseable {
// TODO: read dictionaries
- public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
- LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
- int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
- if (l < 0) {
- throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
- }
- final ArrowBuf buffer = allocator.buffer(l);
- LOGGER.debug("allocated buffer " + buffer);
- in.position(recordBatchBlock.getOffset());
- int n = readFully(buffer, l);
- if (n != l) {
- throw new IllegalStateException(n + " != " + l);
- }
-
- // Record batch flatbuffer is prefixed by its size as int32le
- final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
- RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());
-
- int nodesLength = recordBatchFB.nodesLength();
- final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
- List<ArrowFieldNode> nodes = new ArrayList<>();
- for (int i = 0; i < nodesLength; ++i) {
- FieldNode node = recordBatchFB.nodes(i);
- nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
+ public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
+ LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
+ block.getOffset(), block.getMetadataLength(),
+ block.getBodyLength()));
+ in.position(block.getOffset());
+ ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
+ new ReadChannel(in, block.getOffset()), block, allocator);
+ if (batch == null) {
+ throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
- List<ArrowBuf> buffers = new ArrayList<>();
- for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
- Buffer bufferFB = recordBatchFB.buffers(i);
- LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
- ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
- buffers.add(vectorBuffer);
- }
- ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
- LOGGER.debug("released buffer " + buffer);
- buffer.release();
- return arrowRecordBatch;
+ return batch;
}
+ @Override
public void close() throws IOException {
in.close();
}
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
index 3febd11..24c667e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
@@ -23,14 +23,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.netty.buffer.ArrowBuf;
-
public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
@@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
private final Schema schema;
private final List<ArrowBlock> recordBatches = new ArrayList<>();
-
private boolean started = false;
public ArrowWriter(WritableByteChannel out, Schema schema) {
@@ -49,47 +46,19 @@ public class ArrowWriter implements AutoCloseable {
private void start() throws IOException {
writeMagic();
+ MessageSerializer.serialize(out, schema);
}
-
// TODO: write dictionaries
public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
- out.align();
+ ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
+ LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d",
+ batchDesc.getOffset(), batchDesc.getMetadataLength(), batchDesc.getBodyLength()));
- // write metadata header with int32 size prefix
- long offset = out.getCurrentPosition();
- out.write(recordBatch, true);
- out.align();
- // write body
- long bodyOffset = out.getCurrentPosition();
- List<ArrowBuf> buffers = recordBatch.getBuffers();
- List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
- if (buffers.size() != buffersLayout.size()) {
- throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
- }
- for (int i = 0; i < buffers.size(); i++) {
- ArrowBuf buffer = buffers.get(i);
- ArrowBuffer layout = buffersLayout.get(i);
- long startPosition = bodyOffset + layout.getOffset();
- if (startPosition != out.getCurrentPosition()) {
- out.writeZeros((int)(startPosition - out.getCurrentPosition()));
- }
-
- out.write(buffer);
- if (out.getCurrentPosition() != startPosition + layout.getSize()) {
- throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
- }
- }
- int metadataLength = (int)(bodyOffset - offset);
- if (metadataLength <= 0) {
- throw new InvalidArrowFileException("invalid recordBatch");
- }
- long bodyLength = out.getCurrentPosition() - bodyOffset;
- LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
- recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
+ recordBatches.add(batchDesc);
}
private void checkStarted() throws IOException {
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
index b062f38..a9dc129 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
@@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {
private ReadableByteChannel in;
private long bytesRead = 0;
+ // The starting byte offset into 'in'.
+ private final long startByteOffset;
- public ReadChannel(ReadableByteChannel in) {
+ public ReadChannel(ReadableByteChannel in, long startByteOffset) {
this.in = in;
+ this.startByteOffset = startByteOffset;
+ }
+
+ public ReadChannel(ReadableByteChannel in) {
+ this(in, 0);
}
public long bytesRead() { return bytesRead; }
@@ -65,6 +72,8 @@ public class ReadChannel implements AutoCloseable {
return n;
}
+ public long getCurrentPositiion() { return startByteOffset + bytesRead; }
+
@Override
public void close() throws IOException {
if (this.in != null) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
index 22c46e2..6e22dbd 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -29,6 +29,7 @@ import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.MetadataVersion;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.file.ArrowBlock;
import org.apache.arrow.vector.file.ReadChannel;
import org.apache.arrow.vector.file.WriteChannel;
import org.apache.arrow.vector.schema.ArrowBuffer;
@@ -52,7 +53,8 @@ import io.netty.buffer.ArrowBuf;
* For RecordBatch messages the serialization is:
* 1. 4 byte little endian batch metadata header
* 2. FB serialized RowBatch
- * 3. serialized RowBatch buffers.
+ * 3. Padding to align to 8 byte boundary.
+ * 4. serialized RowBatch buffers.
*/
public class MessageSerializer {
@@ -68,14 +70,10 @@ public class MessageSerializer {
*/
public static long serialize(WriteChannel out, Schema schema) throws IOException {
FlatBufferBuilder builder = new FlatBufferBuilder();
- builder.finish(schema.getSchema(builder));
- ByteBuffer serializedBody = builder.dataBuffer();
- ByteBuffer serializedHeader =
- serializeHeader(MessageHeader.Schema, serializedBody.remaining());
-
- long size = out.writeIntLittleEndian(serializedHeader.remaining());
- size += out.write(serializedHeader);
- size += out.write(serializedBody);
+ int schemaOffset = schema.getSchema(builder);
+ ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0);
+ long size = out.writeIntLittleEndian(serializedMessage.remaining());
+ size += out.write(serializedMessage);
return size;
}
@@ -83,49 +81,51 @@ public class MessageSerializer {
* Deserializes a schema object. Format is from serialize().
*/
public static Schema deserializeSchema(ReadChannel in) throws IOException {
- Message header = deserializeHeader(in, MessageHeader.Schema);
- if (header == null) {
+ Message message = deserializeMessage(in, MessageHeader.Schema);
+ if (message == null) {
throw new IOException("Unexpected end of input. Missing schema.");
}
- // Now read the schema.
- ByteBuffer buffer = ByteBuffer.allocate((int)header.bodyLength());
- if (in.readFully(buffer) != header.bodyLength()) {
- throw new IOException("Unexpected end of input trying to read schema.");
- }
- buffer.rewind();
- return Schema.deserialize(buffer);
+ return Schema.convertSchema((org.apache.arrow.flatbuf.Schema)
+ message.header(new org.apache.arrow.flatbuf.Schema()));
}
/**
- * Serializes an ArrowRecordBatch.
+ * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
*/
- public static long serialize(WriteChannel out, ArrowRecordBatch batch)
+ public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch)
throws IOException {
long start = out.getCurrentPosition();
int bodyLength = batch.computeBodyLength();
- ByteBuffer metadata = WriteChannel.serialize(batch);
- ByteBuffer serializedHeader =
- serializeHeader(MessageHeader.RecordBatch, bodyLength + metadata.remaining() + 4);
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int batchOffset = batch.writeTo(builder);
+
+ ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch,
+ batchOffset, bodyLength);
+
+ int metadataLength = serializedMessage.remaining();
+
+ // Add extra padding bytes so that length prefix + metadata is a multiple
+ // of 8 after alignment
+ if ((start + metadataLength + 4) % 8 != 0) {
+ metadataLength += 8 - (start + metadataLength + 4) % 8;
+ }
- // Write message header.
- out.writeIntLittleEndian(serializedHeader.remaining());
- out.write(serializedHeader);
+ out.writeIntLittleEndian(metadataLength);
+ out.write(serializedMessage);
- // Write the metadata, with the 4 byte little endian prefix
- out.writeIntLittleEndian(metadata.remaining());
- out.write(metadata);
+ // Align the output to 8 byte boundary.
+ out.align();
- // Write batch header.
- long offset = out.getCurrentPosition();
+ long bufferStart = out.getCurrentPosition();
List<ArrowBuf> buffers = batch.getBuffers();
List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
- long startPosition = offset + layout.getOffset();
+ long startPosition = bufferStart + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}
@@ -135,7 +135,8 @@ public class MessageSerializer {
" != " + startPosition + layout.getSize());
}
}
- return out.getCurrentPosition() - start;
+ // Metadata size in the Block account for the size prefix
+ return new ArrowBlock(start, metadataLength + 4, out.getCurrentPosition() - bufferStart);
}
/**
@@ -143,23 +144,62 @@ public class MessageSerializer {
*/
public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in,
BufferAllocator alloc) throws IOException {
- Message header = deserializeHeader(in, MessageHeader.RecordBatch);
- if (header == null) return null;
+ Message message = deserializeMessage(in, MessageHeader.RecordBatch);
+ if (message == null) return null;
+
+ if (message.bodyLength() > Integer.MAX_VALUE) {
+ throw new IOException("Cannot currently deserialize record batches over 2GB");
+ }
+
+ RecordBatch recordBatchFB = (RecordBatch) message.header(new RecordBatch());
+
+ int bodyLength = (int) message.bodyLength();
+
+ // Now read the record batch body
+ ArrowBuf buffer = alloc.buffer(bodyLength);
+ if (in.readFully(buffer, bodyLength) != bodyLength) {
+ throw new IOException("Unexpected end of input trying to read batch.");
+ }
+ return deserializeRecordBatch(recordBatchFB, buffer);
+ }
+
+ /**
+ * Deserializes a RecordBatch knowing the size of the entire message up front. This
+ * minimizes the number of reads to the underlying stream.
+ */
+ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in, ArrowBlock block,
+ BufferAllocator alloc) throws IOException {
+ // Metadata length contains integer prefix plus byte padding
+ long totalLen = block.getMetadataLength() + block.getBodyLength();
- int messageLen = (int)header.bodyLength();
- // Now read the buffer. This has the metadata followed by the data.
- ArrowBuf buffer = alloc.buffer(messageLen);
- if (in.readFully(buffer, messageLen) != messageLen) {
+ if (totalLen > Integer.MAX_VALUE) {
+ throw new IOException("Cannot currently deserialize record batches over 2GB");
+ }
+
+ ArrowBuf buffer = alloc.buffer((int) totalLen);
+ if (in.readFully(buffer, (int) totalLen) != totalLen) {
throw new IOException("Unexpected end of input trying to read batch.");
}
- // Read the metadata. It starts with the 4 byte size of the metadata.
- int metadataSize = buffer.readInt();
- RecordBatch recordBatchFB =
- RecordBatch.getRootAsRecordBatch( buffer.nioBuffer().asReadOnlyBuffer());
+ ArrowBuf metadataBuffer = buffer.slice(4, block.getMetadataLength() - 4);
+
+ Message messageFB =
+ Message.getRootAsMessage(metadataBuffer.nioBuffer().asReadOnlyBuffer());
+
+ RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
+
+ // Now read the body
+ final ArrowBuf body = buffer.slice(block.getMetadataLength(),
+ (int) totalLen - block.getMetadataLength());
+ ArrowRecordBatch result = deserializeRecordBatch(recordBatchFB, body);
+
+ return result;
+ }
- // No read the body
- final ArrowBuf body = buffer.slice(4 + metadataSize, messageLen - metadataSize - 4);
+ // Deserializes a record batch given the Flatbuffer metadata and in-memory body
+ private static ArrowRecordBatch deserializeRecordBatch(RecordBatch recordBatchFB,
+ ArrowBuf body) {
+ // Now read the body
int nodesLength = recordBatchFB.nodesLength();
List<ArrowFieldNode> nodes = new ArrayList<>();
for (int i = 0; i < nodesLength; ++i) {
@@ -174,43 +214,44 @@ public class MessageSerializer {
}
ArrowRecordBatch arrowRecordBatch =
new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
- buffer.release();
+ body.release();
return arrowRecordBatch;
}
/**
* Serializes a message header.
*/
- private static ByteBuffer serializeHeader(byte headerType, int bodyLength) {
- FlatBufferBuilder headerBuilder = new FlatBufferBuilder();
- Message.startMessage(headerBuilder);
- Message.addHeaderType(headerBuilder, headerType);
- Message.addVersion(headerBuilder, MetadataVersion.V1);
- Message.addBodyLength(headerBuilder, bodyLength);
- headerBuilder.finish(Message.endMessage(headerBuilder));
- return headerBuilder.dataBuffer();
+ private static ByteBuffer serializeMessage(FlatBufferBuilder builder, byte headerType,
+ int headerOffset, int bodyLength) {
+ Message.startMessage(builder);
+ Message.addHeaderType(builder, headerType);
+ Message.addHeader(builder, headerOffset);
+ Message.addVersion(builder, MetadataVersion.V1);
+ Message.addBodyLength(builder, bodyLength);
+ builder.finish(Message.endMessage(builder));
+ return builder.dataBuffer();
}
- private static Message deserializeHeader(ReadChannel in, byte headerType) throws IOException {
- // Read the header size. There is an i32 little endian prefix.
+ private static Message deserializeMessage(ReadChannel in, byte headerType) throws IOException {
+ // Read the message size. There is an i32 little endian prefix.
ByteBuffer buffer = ByteBuffer.allocate(4);
if (in.readFully(buffer) != 4) {
return null;
}
- int headerLength = bytesToInt(buffer.array());
- buffer = ByteBuffer.allocate(headerLength);
- if (in.readFully(buffer) != headerLength) {
+ int messageLength = bytesToInt(buffer.array());
+ buffer = ByteBuffer.allocate(messageLength);
+ if (in.readFully(buffer) != messageLength) {
throw new IOException(
- "Unexpected end of stream trying to read header.");
+ "Unexpected end of stream trying to read message.");
}
buffer.rewind();
- Message header = Message.getRootAsMessage(buffer);
- if (header.headerType() != headerType) {
+ Message message = Message.getRootAsMessage(buffer);
+ if (message.headerType() != headerType) {
throw new IOException("Invalid message: expecting " + headerType +
- ". Message contained: " + header.headerType());
+ ". Message contained: " + message.headerType());
}
- return header;
+ return message;
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index bf635fb..9b99144 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -109,8 +109,6 @@ public class TestArrowFile extends BaseFileTest {
List<ArrowBlock> recordBatches = footer.getRecordBatches();
for (ArrowBlock rbBlock : recordBatches) {
- Assert.assertEquals(0, rbBlock.getOffset() % 8);
- Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
for (ArrowBuffer arrowBuffer : buffersLayout) {
@@ -271,8 +269,6 @@ public class TestArrowFile extends BaseFileTest {
for (ArrowBlock rbBlock : recordBatches) {
Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
previousOffset = rbBlock.getOffset();
- Assert.assertEquals(0, rbBlock.getOffset() % 8);
- Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
index 707dba2..1e51458 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFooter.java
@@ -21,7 +21,9 @@ import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.vector.types.pojo.ArrowType;
@@ -41,6 +43,12 @@ public class TestArrowFooter {
ArrowFooter footer = new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), Collections.<ArrowBlock>emptyList());
ArrowFooter newFooter = roundTrip(footer);
assertEquals(footer, newFooter);
+
+ List<ArrowBlock> ids = new ArrayList<>();
+ ids.add(new ArrowBlock(0, 1, 2));
+ ids.add(new ArrowBlock(4, 5, 6));
+ footer = new ArrowFooter(schema, ids, ids);
+ assertEquals(footer, roundTrip(footer));
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/8ca7033f/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
index 8ed89fa..96bcbb1 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java
@@ -24,10 +24,14 @@ import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Collections;
import java.util.List;
+import org.apache.arrow.flatbuf.FieldNode;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
@@ -96,6 +100,18 @@ public class TestArrowReaderWriter {
assertArrayEquals(validity, array(buffers.get(0)));
assertArrayEquals(values, array(buffers.get(1)));
+ // Read just the header. This demonstrates being able to read without need to
+ // deserialize the buffer.
+ ByteBuffer headerBuffer = ByteBuffer.allocate(recordBatches.get(0).getMetadataLength());
+ headerBuffer.put(byteArray, (int)recordBatches.get(0).getOffset(), headerBuffer.capacity());
+ headerBuffer.position(4);
+ Message messageFB = Message.getRootAsMessage(headerBuffer);
+ RecordBatch recordBatchFB = (RecordBatch) messageFB.header(new RecordBatch());
+ assertEquals(2, recordBatchFB.buffersLength());
+ assertEquals(1, recordBatchFB.nodesLength());
+ FieldNode nodeFB = recordBatchFB.nodes(0);
+ assertEquals(16, nodeFB.length());
+ assertEquals(8, nodeFB.nullCount());
}
}