You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/19 19:46:37 UTC
arrow git commit: ARROW-474: [Java] Add initial version of streaming
serialized format.
Repository: arrow
Updated Branches:
refs/heads/master 9b1b3979b -> 6811d3fcf
ARROW-474: [Java] Add initial version of streaming serialized format.
This patch proposes a serialized container format for streaming producer and
consumers. The goal is to allow readers and writers to produce/consume arrow
data without requiring intermediate buffering.
This is similar to the File format but reorganizes the pieces. In particular:
- No magic header. It's likely a reader connects to a 'random' stream to read it.
- Move footer to header. This includes similar information, including the schema.
- ArrowRecordBatches follow one by one. Each is prefixed with an i32 length. The
serialization is identical as the File version.
- See Stream.fbs for more details.
This patch also implements the Java reader/writer.
Author: Nong Li <no...@cerebrodata.com>
Closes #288 from nongli/streaming and squashes the following commits:
554cc18 [Nong Li] Redo serialization format.
03bee58 [Nong Li] Updates from wes' comments.
7257031 [Nong Li] ARROW-474: [Java] Add initial version of streaming serialized format.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6811d3fc
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6811d3fc
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6811d3fc
Branch: refs/heads/master
Commit: 6811d3fcfc9da65e24b6d0f2ad5d5d348d879f11
Parents: 9b1b397
Author: Nong Li <no...@cerebrodata.com>
Authored: Thu Jan 19 14:45:24 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jan 19 14:45:24 2017 -0500
----------------------------------------------------------------------
.../apache/arrow/vector/file/ArrowReader.java | 12 +-
.../apache/arrow/vector/file/ArrowWriter.java | 102 ++-------
.../apache/arrow/vector/file/ReadChannel.java | 75 +++++++
.../apache/arrow/vector/file/WriteChannel.java | 111 ++++++++++
.../arrow/vector/schema/ArrowRecordBatch.java | 23 ++
.../arrow/vector/stream/ArrowStreamReader.java | 95 ++++++++
.../arrow/vector/stream/ArrowStreamWriter.java | 71 ++++++
.../arrow/vector/stream/MessageSerializer.java | 216 +++++++++++++++++++
.../apache/arrow/vector/types/pojo/Schema.java | 5 +
.../apache/arrow/vector/file/TestArrowFile.java | 149 ++++++++++++-
.../vector/stream/MessageSerializerTest.java | 115 ++++++++++
.../arrow/vector/stream/TestArrowStream.java | 96 +++++++++
.../vector/stream/TestArrowStreamPipe.java | 129 +++++++++++
13 files changed, 1100 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
index cd520da..58c5160 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java
@@ -31,6 +31,7 @@ import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ import io.netty.buffer.ArrowBuf;
public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);
- private static final byte[] MAGIC = "ARROW1".getBytes();
+ public static final byte[] MAGIC = "ARROW1".getBytes();
private final SeekableByteChannel in;
@@ -73,13 +74,6 @@ public class ArrowReader implements AutoCloseable {
return total;
}
- private static int bytesToInt(byte[] bytes) {
- return ((int)(bytes[3] & 255) << 24) +
- ((int)(bytes[2] & 255) << 16) +
- ((int)(bytes[1] & 255) << 8) +
- ((int)(bytes[0] & 255) << 0);
- }
-
public ArrowFooter readFooter() throws IOException {
if (footer == null) {
if (in.size() <= (MAGIC.length * 2 + 4)) {
@@ -93,7 +87,7 @@ public class ArrowReader implements AutoCloseable {
if (!Arrays.equals(MAGIC, Arrays.copyOfRange(array, 4, array.length))) {
throw new InvalidArrowFileException("missing Magic number " + Arrays.toString(buffer.array()));
}
- int footerLength = bytesToInt(array);
+ int footerLength = MessageSerializer.bytesToInt(array);
if (footerLength <= 0 || footerLength + MAGIC.length * 2 + 4 > in.size()) {
throw new InvalidArrowFileException("invalid footer length: " + footerLength);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
index 1cd87eb..3febd11 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowWriter.java
@@ -18,7 +18,6 @@
package org.apache.arrow.vector.file;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
@@ -26,32 +25,25 @@ import java.util.List;
import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
-import org.apache.arrow.vector.schema.FBSerializable;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.flatbuffers.FlatBufferBuilder;
-
import io.netty.buffer.ArrowBuf;
public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);
- private static final byte[] MAGIC = "ARROW1".getBytes();
-
- private final WritableByteChannel out;
+ private final WriteChannel out;
private final Schema schema;
private final List<ArrowBlock> recordBatches = new ArrayList<>();
- private long currentPosition = 0;
-
private boolean started = false;
public ArrowWriter(WritableByteChannel out, Schema schema) {
- this.out = out;
+ this.out = new WriteChannel(out);
this.schema = schema;
}
@@ -59,53 +51,19 @@ public class ArrowWriter implements AutoCloseable {
writeMagic();
}
- private long write(byte[] buffer) throws IOException {
- return write(ByteBuffer.wrap(buffer));
- }
-
- private long writeZeros(int zeroCount) throws IOException {
- return write(new byte[zeroCount]);
- }
-
- private long align() throws IOException {
- if (currentPosition % 8 != 0) { // align on 8 byte boundaries
- return writeZeros(8 - (int)(currentPosition % 8));
- }
- return 0;
- }
-
- private long write(ByteBuffer buffer) throws IOException {
- long length = buffer.remaining();
- out.write(buffer);
- currentPosition += length;
- return length;
- }
-
- private static byte[] intToBytes(int value) {
- byte[] outBuffer = new byte[4];
- outBuffer[3] = (byte)(value >>> 24);
- outBuffer[2] = (byte)(value >>> 16);
- outBuffer[1] = (byte)(value >>> 8);
- outBuffer[0] = (byte)(value >>> 0);
- return outBuffer;
- }
-
- private long writeIntLittleEndian(int v) throws IOException {
- return write(intToBytes(v));
- }
// TODO: write dictionaries
public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
- align();
+ out.align();
// write metadata header with int32 size prefix
- long offset = currentPosition;
- write(recordBatch, true);
- align();
+ long offset = out.getCurrentPosition();
+ out.write(recordBatch, true);
+ out.align();
// write body
- long bodyOffset = currentPosition;
+ long bodyOffset = out.getCurrentPosition();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
if (buffers.size() != buffersLayout.size()) {
@@ -115,31 +73,25 @@ public class ArrowWriter implements AutoCloseable {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = bodyOffset + layout.getOffset();
- if (startPosition != currentPosition) {
- writeZeros((int)(startPosition - currentPosition));
+ if (startPosition != out.getCurrentPosition()) {
+ out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}
- write(buffer);
- if (currentPosition != startPosition + layout.getSize()) {
- throw new IllegalStateException("wrong buffer size: " + currentPosition + " != " + startPosition + layout.getSize());
+ out.write(buffer);
+ if (out.getCurrentPosition() != startPosition + layout.getSize()) {
+ throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
}
}
int metadataLength = (int)(bodyOffset - offset);
if (metadataLength <= 0) {
throw new InvalidArrowFileException("invalid recordBatch");
}
- long bodyLength = currentPosition - bodyOffset;
+ long bodyLength = out.getCurrentPosition() - bodyOffset;
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
}
- private void write(ArrowBuf buffer) throws IOException {
- ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes());
- LOGGER.debug("Writing buffer with size: " + nioBuffer.remaining());
- write(nioBuffer);
- }
-
private void checkStarted() throws IOException {
if (!started) {
started = true;
@@ -147,15 +99,16 @@ public class ArrowWriter implements AutoCloseable {
}
}
+ @Override
public void close() throws IOException {
try {
- long footerStart = currentPosition;
+ long footerStart = out.getCurrentPosition();
writeFooter();
- int footerLength = (int)(currentPosition - footerStart);
+ int footerLength = (int)(out.getCurrentPosition() - footerStart);
if (footerLength <= 0 ) {
throw new InvalidArrowFileException("invalid footer");
}
- writeIntLittleEndian(footerLength);
+ out.writeIntLittleEndian(footerLength);
LOGGER.debug(String.format("Footer starts at %d, length: %d", footerStart, footerLength));
writeMagic();
} finally {
@@ -164,27 +117,12 @@ public class ArrowWriter implements AutoCloseable {
}
private void writeMagic() throws IOException {
- write(MAGIC);
- LOGGER.debug(String.format("magic written, now at %d", currentPosition));
+ out.write(ArrowReader.MAGIC);
+ LOGGER.debug(String.format("magic written, now at %d", out.getCurrentPosition()));
}
private void writeFooter() throws IOException {
// TODO: dictionaries
- write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches), false);
- }
-
- private long write(FBSerializable writer, boolean withSizePrefix) throws IOException {
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int root = writer.writeTo(builder);
- builder.finish(root);
-
- ByteBuffer buffer = builder.dataBuffer();
-
- if (withSizePrefix) {
- writeIntLittleEndian(buffer.remaining());
- }
-
- return write(buffer);
+ out.write(new ArrowFooter(schema, Collections.<ArrowBlock>emptyList(), recordBatches), false);
}
-
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
new file mode 100644
index 0000000..b062f38
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ReadChannel.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ArrowBuf;
+
+public class ReadChannel implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReadChannel.class);
+
+ private ReadableByteChannel in;
+ private long bytesRead = 0;
+
+ public ReadChannel(ReadableByteChannel in) {
+ this.in = in;
+ }
+
+ public long bytesRead() { return bytesRead; }
+
+ /**
+ * Reads bytes into buffer until it is full (buffer.remaining() == 0). Returns the
+ * number of bytes read which can be less than full if there are no more.
+ */
+ public int readFully(ByteBuffer buffer) throws IOException {
+ LOGGER.debug("Reading buffer with size: " + buffer.remaining());
+ int totalRead = 0;
+ while (buffer.remaining() != 0) {
+ int read = in.read(buffer);
+ if (read < 0) return totalRead;
+ totalRead += read;
+ if (read == 0) break;
+ }
+ this.bytesRead += totalRead;
+ return totalRead;
+ }
+
+ /**
+ * Reads up to len into buffer. Returns bytes read.
+ */
+ public int readFully(ArrowBuf buffer, int l) throws IOException {
+ int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
+ buffer.writerIndex(n);
+ return n;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.in != null) {
+ in.close();
+ in = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
new file mode 100644
index 0000000..d99c9a6
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/file/WriteChannel.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.file;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.arrow.vector.schema.FBSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Wrapper around a WritableByteChannel that maintains the position as well adding
+ * some common serialization utilities.
+ */
+public class WriteChannel implements AutoCloseable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(WriteChannel.class);
+
+ private long currentPosition = 0;
+
+ private final WritableByteChannel out;
+
+ public WriteChannel(WritableByteChannel out) {
+ this.out = out;
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ public long getCurrentPosition() {
+ return currentPosition;
+ }
+
+ public long write(byte[] buffer) throws IOException {
+ return write(ByteBuffer.wrap(buffer));
+ }
+
+ public long writeZeros(int zeroCount) throws IOException {
+ return write(new byte[zeroCount]);
+ }
+
+ public long align() throws IOException {
+ if (currentPosition % 8 != 0) { // align on 8 byte boundaries
+ return writeZeros(8 - (int)(currentPosition % 8));
+ }
+ return 0;
+ }
+
+ public long write(ByteBuffer buffer) throws IOException {
+ long length = buffer.remaining();
+ LOGGER.debug("Writing buffer with size: " + length);
+ out.write(buffer);
+ currentPosition += length;
+ return length;
+ }
+
+ public static byte[] intToBytes(int value) {
+ byte[] outBuffer = new byte[4];
+ outBuffer[3] = (byte)(value >>> 24);
+ outBuffer[2] = (byte)(value >>> 16);
+ outBuffer[1] = (byte)(value >>> 8);
+ outBuffer[0] = (byte)(value >>> 0);
+ return outBuffer;
+ }
+
+ public long writeIntLittleEndian(int v) throws IOException {
+ return write(intToBytes(v));
+ }
+
+ public void write(ArrowBuf buffer) throws IOException {
+ ByteBuffer nioBuffer = buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes());
+ write(nioBuffer);
+ }
+
+ public long write(FBSerializable writer, boolean withSizePrefix) throws IOException {
+ ByteBuffer buffer = serialize(writer);
+ if (withSizePrefix) {
+ writeIntLittleEndian(buffer.remaining());
+ }
+ return write(buffer);
+ }
+
+ public static ByteBuffer serialize(FBSerializable writer) {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int root = writer.writeTo(builder);
+ builder.finish(root);
+ return builder.dataBuffer();
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
index adb99e2..40c2fbf 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/ArrowRecordBatch.java
@@ -19,6 +19,7 @@ package org.apache.arrow.vector.schema;
import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -130,6 +131,28 @@ public class ArrowRecordBatch implements FBSerializable, AutoCloseable {
+ buffersLayout + ", closed=" + closed + "]";
}
+ /**
+ * Computes the size of the serialized body for this recordBatch.
+ */
+ public int computeBodyLength() {
+ int size = 0;
+
+ List<ArrowBuf> buffers = getBuffers();
+ List<ArrowBuffer> buffersLayout = getBuffersLayout();
+ if (buffers.size() != buffersLayout.size()) {
+ throw new IllegalStateException("the layout does not match: " +
+ buffers.size() + " != " + buffersLayout.size());
+ }
+ for (int i = 0; i < buffers.size(); i++) {
+ ArrowBuf buffer = buffers.get(i);
+ ArrowBuffer layout = buffersLayout.get(i);
+ size += (layout.getOffset() - size);
+ ByteBuffer nioBuffer =
+ buffer.nioBuffer(buffer.readerIndex(), buffer.readableBytes());
+ size += nioBuffer.remaining();
+ }
+ return size;
+ }
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
new file mode 100644
index 0000000..f32966c
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamReader.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.file.ReadChannel;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This classes reads from an input stream and produces ArrowRecordBatches.
+ */
+public class ArrowStreamReader implements AutoCloseable {
+ private ReadChannel in;
+ private final BufferAllocator allocator;
+ private Schema schema;
+
+ /**
+ * Constructs a streaming read, reading bytes from 'in'. Non-blocking.
+ */
+ public ArrowStreamReader(ReadableByteChannel in, BufferAllocator allocator) {
+ super();
+ this.in = new ReadChannel(in);
+ this.allocator = allocator;
+ }
+
+ public ArrowStreamReader(InputStream in, BufferAllocator allocator) {
+ this(Channels.newChannel(in), allocator);
+ }
+
+ /**
+ * Initializes the reader. Must be called before the other APIs. This is blocking.
+ */
+ public void init() throws IOException {
+ Preconditions.checkState(this.schema == null, "Cannot call init() more than once.");
+ this.schema = readSchema();
+ }
+
+ /**
+ * Returns the schema for all records in this stream.
+ */
+ public Schema getSchema () {
+ Preconditions.checkState(this.schema != null, "Must call init() first.");
+ return schema;
+ }
+
+ public long bytesRead() { return in.bytesRead(); }
+
+ /**
+ * Reads and returns the next ArrowRecordBatch. Returns null if this is the end
+ * of stream.
+ */
+ public ArrowRecordBatch nextRecordBatch() throws IOException {
+ Preconditions.checkState(this.in != null, "Cannot call after close()");
+ Preconditions.checkState(this.schema != null, "Must call init() first.");
+ return MessageSerializer.deserializeRecordBatch(in, allocator);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.in != null) {
+ in.close();
+ in = null;
+ }
+ }
+
+ /**
+ * Reads the schema message from the beginning of the stream.
+ */
+ private Schema readSchema() throws IOException {
+ return MessageSerializer.deserializeSchema(in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
new file mode 100644
index 0000000..06acf9f
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/ArrowStreamWriter.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.arrow.vector.file.WriteChannel;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class ArrowStreamWriter implements AutoCloseable {
+ private final WriteChannel out;
+ private final Schema schema;
+ private boolean headerSent = false;
+
+ /**
+ * Creates the stream writer. non-blocking.
+ * totalBatches can be set if the writer knows beforehand. Can be -1 if unknown.
+ */
+ public ArrowStreamWriter(WritableByteChannel out, Schema schema, int totalBatches) {
+ this.out = new WriteChannel(out);
+ this.schema = schema;
+ }
+
+ public ArrowStreamWriter(OutputStream out, Schema schema, int totalBatches)
+ throws IOException {
+ this(Channels.newChannel(out), schema, totalBatches);
+ }
+
+ public long bytesWritten() { return out.getCurrentPosition(); }
+
+ public void writeRecordBatch(ArrowRecordBatch batch) throws IOException {
+ // Send the header if we have not yet.
+ checkAndSendHeader();
+ MessageSerializer.serialize(out, batch);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // The header might not have been sent if this is an empty stream. Send it even in
+ // this case so readers see a valid empty stream.
+ checkAndSendHeader();
+ out.close();
+ }
+
+ private void checkAndSendHeader() throws IOException {
+ if (!headerSent) {
+ MessageSerializer.serialize(out, schema);
+ headerSent = true;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
new file mode 100644
index 0000000..22c46e2
--- /dev/null
+++ b/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.arrow.flatbuf.Buffer;
+import org.apache.arrow.flatbuf.FieldNode;
+import org.apache.arrow.flatbuf.Message;
+import org.apache.arrow.flatbuf.MessageHeader;
+import org.apache.arrow.flatbuf.MetadataVersion;
+import org.apache.arrow.flatbuf.RecordBatch;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.file.ReadChannel;
+import org.apache.arrow.vector.file.WriteChannel;
+import org.apache.arrow.vector.schema.ArrowBuffer;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Utility class for serializing Messages. Messages are all serialized a similar way.
+ * 1. 4 byte little endian message header prefix
+ * 2. FB serialized Message: This includes it the body length, which is the serialized
+ * body and the type of the message.
+ * 3. Serialized message.
+ *
+ * For schema messages, the serialization is simply the FB serialized Schema.
+ *
+ * For RecordBatch messages the serialization is:
+ * 1. 4 byte little endian batch metadata header
+ * 2. FB serialized RowBatch
+ * 3. serialized RowBatch buffers.
+ */
+public class MessageSerializer {
+
+ public static int bytesToInt(byte[] bytes) {
+ return ((bytes[3] & 255) << 24) +
+ ((bytes[2] & 255) << 16) +
+ ((bytes[1] & 255) << 8) +
+ ((bytes[0] & 255) << 0);
+ }
+
+ /**
+ * Serialize a schema object.
+ */
+ public static long serialize(WriteChannel out, Schema schema) throws IOException {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ builder.finish(schema.getSchema(builder));
+ ByteBuffer serializedBody = builder.dataBuffer();
+ ByteBuffer serializedHeader =
+ serializeHeader(MessageHeader.Schema, serializedBody.remaining());
+
+ long size = out.writeIntLittleEndian(serializedHeader.remaining());
+ size += out.write(serializedHeader);
+ size += out.write(serializedBody);
+ return size;
+ }
+
+ /**
+ * Deserializes a schema object. Format is from serialize().
+ */
+ public static Schema deserializeSchema(ReadChannel in) throws IOException {
+ Message header = deserializeHeader(in, MessageHeader.Schema);
+ if (header == null) {
+ throw new IOException("Unexpected end of input. Missing schema.");
+ }
+
+ // Now read the schema.
+ ByteBuffer buffer = ByteBuffer.allocate((int)header.bodyLength());
+ if (in.readFully(buffer) != header.bodyLength()) {
+ throw new IOException("Unexpected end of input trying to read schema.");
+ }
+ buffer.rewind();
+ return Schema.deserialize(buffer);
+ }
+
+ /**
+ * Serializes an ArrowRecordBatch.
+ */
+ public static long serialize(WriteChannel out, ArrowRecordBatch batch)
+ throws IOException {
+ long start = out.getCurrentPosition();
+ int bodyLength = batch.computeBodyLength();
+
+ ByteBuffer metadata = WriteChannel.serialize(batch);
+ ByteBuffer serializedHeader =
+ serializeHeader(MessageHeader.RecordBatch, bodyLength + metadata.remaining() + 4);
+
+ // Write message header.
+ out.writeIntLittleEndian(serializedHeader.remaining());
+ out.write(serializedHeader);
+
+ // Write the metadata, with the 4 byte little endian prefix
+ out.writeIntLittleEndian(metadata.remaining());
+ out.write(metadata);
+
+ // Write batch header.
+ long offset = out.getCurrentPosition();
+ List<ArrowBuf> buffers = batch.getBuffers();
+ List<ArrowBuffer> buffersLayout = batch.getBuffersLayout();
+
+ for (int i = 0; i < buffers.size(); i++) {
+ ArrowBuf buffer = buffers.get(i);
+ ArrowBuffer layout = buffersLayout.get(i);
+ long startPosition = offset + layout.getOffset();
+ if (startPosition != out.getCurrentPosition()) {
+ out.writeZeros((int)(startPosition - out.getCurrentPosition()));
+ }
+ out.write(buffer);
+ if (out.getCurrentPosition() != startPosition + layout.getSize()) {
+ throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() +
+ " != " + startPosition + layout.getSize());
+ }
+ }
+ return out.getCurrentPosition() - start;
+ }
+
+ /**
+ * Deserializes a RecordBatch
+ */
+ public static ArrowRecordBatch deserializeRecordBatch(ReadChannel in,
+ BufferAllocator alloc) throws IOException {
+ Message header = deserializeHeader(in, MessageHeader.RecordBatch);
+ if (header == null) return null;
+
+ int messageLen = (int)header.bodyLength();
+ // Now read the buffer. This has the metadata followed by the data.
+ ArrowBuf buffer = alloc.buffer(messageLen);
+ if (in.readFully(buffer, messageLen) != messageLen) {
+ throw new IOException("Unexpected end of input trying to read batch.");
+ }
+
+ // Read the metadata. It starts with the 4 byte size of the metadata.
+ int metadataSize = buffer.readInt();
+ RecordBatch recordBatchFB =
+ RecordBatch.getRootAsRecordBatch( buffer.nioBuffer().asReadOnlyBuffer());
+
+ // No read the body
+ final ArrowBuf body = buffer.slice(4 + metadataSize, messageLen - metadataSize - 4);
+ int nodesLength = recordBatchFB.nodesLength();
+ List<ArrowFieldNode> nodes = new ArrayList<>();
+ for (int i = 0; i < nodesLength; ++i) {
+ FieldNode node = recordBatchFB.nodes(i);
+ nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
+ }
+ List<ArrowBuf> buffers = new ArrayList<>();
+ for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
+ Buffer bufferFB = recordBatchFB.buffers(i);
+ ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
+ buffers.add(vectorBuffer);
+ }
+ ArrowRecordBatch arrowRecordBatch =
+ new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
+ buffer.release();
+ return arrowRecordBatch;
+ }
+
+ /**
+ * Serializes a message header.
+ */
+ private static ByteBuffer serializeHeader(byte headerType, int bodyLength) {
+ FlatBufferBuilder headerBuilder = new FlatBufferBuilder();
+ Message.startMessage(headerBuilder);
+ Message.addHeaderType(headerBuilder, headerType);
+ Message.addVersion(headerBuilder, MetadataVersion.V1);
+ Message.addBodyLength(headerBuilder, bodyLength);
+ headerBuilder.finish(Message.endMessage(headerBuilder));
+ return headerBuilder.dataBuffer();
+ }
+
+ private static Message deserializeHeader(ReadChannel in, byte headerType) throws IOException {
+ // Read the header size. There is an i32 little endian prefix.
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+ if (in.readFully(buffer) != 4) {
+ return null;
+ }
+
+ int headerLength = bytesToInt(buffer.array());
+ buffer = ByteBuffer.allocate(headerLength);
+ if (in.readFully(buffer) != headerLength) {
+ throw new IOException(
+ "Unexpected end of stream trying to read header.");
+ }
+ buffer.rewind();
+
+ Message header = Message.getRootAsMessage(buffer);
+ if (header.headerType() != headerType) {
+ throw new IOException("Invalid message: expecting " + headerType +
+ ". Message contained: " + header.headerType());
+ }
+ return header;
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
----------------------------------------------------------------------
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
index 5ca8ade..c33bd6e 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/types/pojo/Schema.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.arrow.vector.types.pojo.Field.convertField;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -65,6 +66,10 @@ public class Schema {
return reader.readValue(checkNotNull(json));
}
+ public static Schema deserialize(ByteBuffer buffer) {
+ return convertSchema(org.apache.arrow.flatbuf.Schema.getRootAsSchema(buffer));
+ }
+
public static Schema convertSchema(org.apache.arrow.flatbuf.Schema schema) {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
for (int i = 0; i < schema.fieldsLength(); i++) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
index 5fa18b3..bf635fb 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java
@@ -18,12 +18,16 @@
package org.apache.arrow.vector.file;
import static org.apache.arrow.vector.TestVectorUnloadLoad.newVectorUnloader;
+import static org.junit.Assert.assertTrue;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
@@ -35,6 +39,8 @@ import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.NullableMapVector;
import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.stream.ArrowStreamReader;
+import org.apache.arrow.vector.stream.ArrowStreamWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Test;
@@ -52,7 +58,7 @@ public class TestArrowFile extends BaseFileTest {
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", vectorAllocator, null)) {
writeData(count, parent);
- write(parent.getChild("root"), file);
+ write(parent.getChild("root"), file, new ByteArrayOutputStream());
}
}
@@ -66,13 +72,14 @@ public class TestArrowFile extends BaseFileTest {
writeComplexData(count, parent);
FieldVector root = parent.getChild("root");
validateComplexContent(count, new VectorSchemaRoot(root));
- write(root, file);
+ write(root, file, new ByteArrayOutputStream());
}
}
@Test
public void testWriteRead() throws IOException {
File file = new File("target/mytest.arrow");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
int count = COUNT;
// write
@@ -80,7 +87,7 @@ public class TestArrowFile extends BaseFileTest {
BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
writeData(count, parent);
- write(parent.getChild("root"), file);
+ write(parent.getChild("root"), file, stream);
}
// read
@@ -116,11 +123,40 @@ public class TestArrowFile extends BaseFileTest {
}
}
}
+
+ // Read from stream.
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+ ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ arrowReader.init();
+ Schema schema = arrowReader.getSchema();
+ LOGGER.debug("reading schema: " + schema);
+
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ while (true) {
+ try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
+ if (recordBatch == null) break;
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
+ }
+ }
+ validateContent(count, root);
+ }
+ }
}
@Test
public void testWriteReadComplex() throws IOException {
File file = new File("target/mytest_complex.arrow");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
int count = COUNT;
// write
@@ -128,7 +164,7 @@ public class TestArrowFile extends BaseFileTest {
BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", originalVectorAllocator, null)) {
writeComplexData(count, parent);
- write(parent.getChild("root"), file);
+ write(parent.getChild("root"), file, stream);
}
// read
@@ -156,11 +192,36 @@ public class TestArrowFile extends BaseFileTest {
}
}
}
+
+ // Read from stream.
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+ ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ arrowReader.init();
+ Schema schema = arrowReader.getSchema();
+ LOGGER.debug("reading schema: " + schema);
+
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ while (true) {
+ try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
+ if (recordBatch == null) break;
+ vectorLoader.load(recordBatch);
+ }
+ }
+ validateComplexContent(count, root);
+ }
+ }
}
@Test
public void testWriteReadMultipleRBs() throws IOException {
File file = new File("target/mytest_multiple.arrow");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
int[] counts = { 10, 5 };
// write
@@ -172,10 +233,12 @@ public class TestArrowFile extends BaseFileTest {
VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root"));
Schema schema = vectorUnloader0.getSchema();
Assert.assertEquals(2, schema.getFields().size());
- try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) {
+ try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
+ ArrowStreamWriter streamWriter = new ArrowStreamWriter(stream, schema, 2)) {
try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
arrowWriter.writeRecordBatch(recordBatch);
+ streamWriter.writeRecordBatch(recordBatch);
}
parent.allocateNew();
writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order.
@@ -183,6 +246,7 @@ public class TestArrowFile extends BaseFileTest {
try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) {
Assert.assertEquals("RB #1", counts[1], recordBatch.getLength());
arrowWriter.writeRecordBatch(recordBatch);
+ streamWriter.writeRecordBatch(recordBatch);
}
}
}
@@ -222,11 +286,42 @@ public class TestArrowFile extends BaseFileTest {
}
}
}
+
+ // read stream
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+ ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ arrowReader.init();
+ Schema schema = arrowReader.getSchema();
+ LOGGER.debug("reading schema: " + schema);
+ int i = 0;
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ for (int n = 0; n < 2; n++) {
+ try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
+ assertTrue(recordBatch != null);
+ Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
+ List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
+ for (ArrowBuffer arrowBuffer : buffersLayout) {
+ Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
+ }
+ vectorLoader.load(recordBatch);
+ validateContent(counts[i], root);
+ }
+ ++i;
+ }
+ }
+ }
}
@Test
public void testWriteReadUnion() throws IOException {
File file = new File("target/mytest_write_union.arrow");
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
int count = COUNT;
try (
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
@@ -238,9 +333,9 @@ public class TestArrowFile extends BaseFileTest {
validateUnionData(count, new VectorSchemaRoot(parent.getChild("root")));
- write(parent.getChild("root"), file);
+ write(parent.getChild("root"), file, stream);
}
- // read
+ // read
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(file);
@@ -263,9 +358,37 @@ public class TestArrowFile extends BaseFileTest {
}
}
}
+
+ // Read from stream.
+ try (
+ BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
+ ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
+ ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator);
+ BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
+ MapVector parent = new MapVector("parent", vectorAllocator, null)
+ ) {
+ arrowReader.init();
+ Schema schema = arrowReader.getSchema();
+ LOGGER.debug("reading schema: " + schema);
+
+ try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator)) {
+ VectorLoader vectorLoader = new VectorLoader(root);
+ while (true) {
+ try (ArrowRecordBatch recordBatch = arrowReader.nextRecordBatch()) {
+ if (recordBatch == null) break;
+ vectorLoader.load(recordBatch);
+ }
+ }
+ validateUnionData(count, root);
+ }
+ }
}
- private void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
+ /**
+ * Writes the contents of parents to file. If outStream is non-null, also writes it
+ * to outStream in the streaming serialized format.
+ */
+ private void write(FieldVector parent, File file, OutputStream outStream) throws FileNotFoundException, IOException {
VectorUnloader vectorUnloader = newVectorUnloader(parent);
Schema schema = vectorUnloader.getSchema();
LOGGER.debug("writing schema: " + schema);
@@ -276,5 +399,15 @@ public class TestArrowFile extends BaseFileTest {
) {
arrowWriter.writeRecordBatch(recordBatch);
}
+
+ // Also try serializing to the stream writer.
+ if (outStream != null) {
+ try (
+ ArrowStreamWriter arrowWriter = new ArrowStreamWriter(outStream, schema, -1);
+ ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
+ ) {
+ arrowWriter.writeRecordBatch(recordBatch);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
new file mode 100644
index 0000000..7b4de80
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/MessageSerializerTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.ReadChannel;
+import org.apache.arrow.vector.file.WriteChannel;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class MessageSerializerTest {
+
+ public static ArrowBuf buf(BufferAllocator alloc, byte[] bytes) {
+ ArrowBuf buffer = alloc.buffer(bytes.length);
+ buffer.writeBytes(bytes);
+ return buffer;
+ }
+
+ public static byte[] array(ArrowBuf buf) {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ return bytes;
+ }
+
+ @Test
+ public void testSchemaMessageSerialization() throws IOException {
+ Schema schema = testSchema();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ long size = MessageSerializer.serialize(
+ new WriteChannel(Channels.newChannel(out)), schema);
+ assertEquals(size, out.toByteArray().length);
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ Schema deserialized = MessageSerializer.deserializeSchema(
+ new ReadChannel(Channels.newChannel(in)));
+ assertEquals(schema, deserialized);
+ assertEquals(1, deserialized.getFields().size());
+ }
+
+ @Test
+ public void testSerializeRecordBatch() throws IOException {
+ byte[] validity = new byte[] { (byte)255, 0};
+ // second half is "undefined"
+ byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+ BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+ ArrowBuf validityb = buf(alloc, validity);
+ ArrowBuf valuesb = buf(alloc, values);
+
+ ArrowRecordBatch batch = new ArrowRecordBatch(
+ 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch);
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ ArrowRecordBatch deserialized = MessageSerializer.deserializeRecordBatch(
+ new ReadChannel(Channels.newChannel(in)), alloc);
+ verifyBatch(deserialized, validity, values);
+ }
+
+ public static Schema testSchema() {
+ return new Schema(asList(new Field(
+ "testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
+ }
+
+ // Verifies batch contents matching test schema.
+ public static void verifyBatch(ArrowRecordBatch batch, byte[] validity, byte[] values) {
+ assertTrue(batch != null);
+ List<ArrowFieldNode> nodes = batch.getNodes();
+ assertEquals(1, nodes.size());
+ ArrowFieldNode node = nodes.get(0);
+ assertEquals(16, node.getLength());
+ assertEquals(8, node.getNullCount());
+ List<ArrowBuf> buffers = batch.getBuffers();
+ assertEquals(2, buffers.size());
+ assertArrayEquals(validity, MessageSerializerTest.array(buffers.get(0)));
+ assertArrayEquals(values, MessageSerializerTest.array(buffers.get(1)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
new file mode 100644
index 0000000..ba1cdae
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStream.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.file.BaseFileTest;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestArrowStream extends BaseFileTest {
+ @Test
+ public void testEmptyStream() throws IOException {
+ Schema schema = MessageSerializerTest.testSchema();
+
+ // Write the stream.
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema, -1)) {
+ }
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ try (ArrowStreamReader reader = new ArrowStreamReader(in, allocator)) {
+ reader.init();
+ assertEquals(schema, reader.getSchema());
+ // Empty should return null. Can be called repeatedly.
+ assertTrue(reader.nextRecordBatch() == null);
+ assertTrue(reader.nextRecordBatch() == null);
+ }
+ }
+
+ @Test
+ public void testReadWrite() throws IOException {
+ Schema schema = MessageSerializerTest.testSchema();
+ byte[] validity = new byte[] { (byte)255, 0};
+ // second half is "undefined"
+ byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+ int numBatches = 5;
+ BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ long bytesWritten = 0;
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(out, schema, numBatches)) {
+ ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
+ ArrowBuf valuesb = MessageSerializerTest.buf(alloc, values);
+ for (int i = 0; i < numBatches; i++) {
+ writer.writeRecordBatch(new ArrowRecordBatch(
+ 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
+ }
+ bytesWritten = writer.bytesWritten();
+ }
+
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ try (ArrowStreamReader reader = new ArrowStreamReader(in, alloc)) {
+ reader.init();
+ Schema readSchema = reader.getSchema();
+ for (int i = 0; i < numBatches; i++) {
+ assertEquals(schema, readSchema);
+ assertTrue(
+ readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(),
+ readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0);
+ ArrowRecordBatch recordBatch = reader.nextRecordBatch();
+ MessageSerializerTest.verifyBatch(recordBatch, validity, values);
+ assertTrue(recordBatch != null);
+ }
+ assertTrue(reader.nextRecordBatch() == null);
+ assertEquals(bytesWritten, reader.bytesRead());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6811d3fc/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
----------------------------------------------------------------------
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
new file mode 100644
index 0000000..e187fa5
--- /dev/null
+++ b/java/vector/src/test/java/org/apache/arrow/vector/stream/TestArrowStreamPipe.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.vector.stream;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.channels.Pipe;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.schema.ArrowFieldNode;
+import org.apache.arrow.vector.schema.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestArrowStreamPipe {
+ Schema schema = MessageSerializerTest.testSchema();
+ // second half is "undefined"
+ byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+ private final class WriterThread extends Thread {
+ private final int numBatches;
+ private final ArrowStreamWriter writer;
+
+ public WriterThread(int numBatches, WritableByteChannel sinkChannel)
+ throws IOException {
+ this.numBatches = numBatches;
+ writer = new ArrowStreamWriter(sinkChannel, schema, -1);
+ }
+
+ @Override
+ public void run() {
+ BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+ try {
+ ArrowBuf valuesb = MessageSerializerTest.buf(alloc, values);
+ for (int i = 0; i < numBatches; i++) {
+ // Send a changing byte id first.
+ byte[] validity = new byte[] { (byte)i, 0};
+ ArrowBuf validityb = MessageSerializerTest.buf(alloc, validity);
+ writer.writeRecordBatch(new ArrowRecordBatch(
+ 16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
+ }
+ writer.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+ public long bytesWritten() { return writer.bytesWritten(); }
+ }
+
+ private final class ReaderThread extends Thread {
+ private int batchesRead = 0;
+ private final ArrowStreamReader reader;
+ private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
+
+ public ReaderThread(ReadableByteChannel sourceChannel)
+ throws IOException {
+ reader = new ArrowStreamReader(sourceChannel, alloc);
+ }
+
+ @Override
+ public void run() {
+ try {
+ reader.init();
+ assertEquals(schema, reader.getSchema());
+ assertTrue(
+ reader.getSchema().getFields().get(0).getTypeLayout().getVectorTypes().toString(),
+ reader.getSchema().getFields().get(0).getTypeLayout().getVectors().size() > 0);
+
+ // Read all the batches. Each batch contains an incrementing id and then some
+ // constant data. Verify both.
+ while (true) {
+ ArrowRecordBatch batch = reader.nextRecordBatch();
+ if (batch == null) break;
+ byte[] validity = new byte[] { (byte)batchesRead, 0};
+ MessageSerializerTest.verifyBatch(batch, validity, values);
+ batchesRead++;
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(false);
+ }
+ }
+
+ public int getBatchesRead() { return batchesRead; }
+ public long bytesRead() { return reader.bytesRead(); }
+ }
+
+ // Starts up a producer and consumer thread to read/write batches.
+ @Test
+ public void pipeTest() throws IOException, InterruptedException {
+ int NUM_BATCHES = 1000;
+ Pipe pipe = Pipe.open();
+ WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
+ ReaderThread reader = new ReaderThread(pipe.source());
+
+ writer.start();
+ reader.start();
+ reader.join();
+ writer.join();
+
+ assertEquals(NUM_BATCHES, reader.getBatchesRead());
+ assertEquals(writer.bytesWritten(), reader.bytesRead());
+ }
+}