You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by cu...@apache.org on 2019/08/08 17:36:03 UTC
[arrow] branch master updated: ARROW-5579: [Java] Shade flatbuffers
This is an automated email from the ASF dual-hosted git repository.
cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new d549b7c ARROW-5579: [Java] Shade flatbuffers
d549b7c is described below
commit d549b7c6223a16b462c2ec1cbcc1698827b0c87f
Author: Micah Kornfield <em...@gmail.com>
AuthorDate: Thu Aug 8 10:35:38 2019 -0700
ARROW-5579: [Java] Shade flatbuffers
- Merge the format package into the vector package
- shade flatbuffers in vector
- create missing wrappers to avoid direct flatbuffer dependencies in flight
[] Possibly add unit tests to cover new functions/classes.
@pravindra @tianchen92 please take a look, I think this is the only reasonable way to shade flatbuffers. I don't know if there was any particular reason that format and vector where separate but I think it makes it very difficult to shade if we don't take this approach.
Closes #4701 from emkornfield/shade2 and squashes the following commits:
4269ce8 <Micah Kornfield> add comment
d196772 <Micah Kornfield> rename getType
45fad2a <Micah Kornfield> address CR feedback
f5a3a70 <Micah Kornfield> centralize serialization code in the serializer
9b54c6e <Micah Kornfield> ArrowMessageHeader->ArrowMessageMetadata
8daeca4 <Micah Kornfield> remove extra whitespace
90eebb3 <Micah Kornfield> address feedback part 1
38fabe8 <Micah Kornfield> Add shading, cleanup depedencies and CI for shaded artifacts
11f9303 <Micah Kornfield> Refactor java files so all flatbuffers are wrapped in vector
Authored-by: Micah Kornfield <em...@gmail.com>
Signed-off-by: Bryan Cutler <cu...@gmail.com>
---
ci/docker_build_java.sh | 6 +-
ci/docker_java_test_all.sh | 1 +
java/README.md | 9 +++
java/adapter/jdbc/pom.xml | 1 +
java/adapter/orc/pom.xml | 9 +--
.../apache/arrow/adapter/orc/OrcStripeReader.java | 5 --
java/algorithm/pom.xml | 1 +
java/flight/pom.xml | 10 +--
.../java/org/apache/arrow/flight/ArrowMessage.java | 62 ++++++----------
.../org/apache/arrow/flight/perf/TestPerf.java | 7 +-
java/gandiva/pom.xml | 1 +
java/performance/pom.xml | 1 +
java/pom.xml | 17 ++++-
java/tools/pom.xml | 1 +
java/vector/pom.xml | 32 ++++++++
.../vector/ipc/message/ArrowDictionaryBatch.java | 5 ++
.../arrow/vector/ipc/message/ArrowMessage.java | 3 +
.../arrow/vector/ipc/message/ArrowRecordBatch.java | 4 +
.../vector/ipc/message/MessageMetadataResult.java | 22 +++++-
.../vector/ipc/message/MessageSerializer.java | 86 +++++++++++++++++-----
.../ipc/message/TestMessageMetadataResult.java} | 25 +++----
21 files changed, 205 insertions(+), 103 deletions(-)
diff --git a/ci/docker_build_java.sh b/ci/docker_build_java.sh
index e6516b7..f1e5276 100755
--- a/ci/docker_build_java.sh
+++ b/ci/docker_build_java.sh
@@ -37,8 +37,12 @@ if [ "$ARROW_JAVA_RUN_TESTS" != "1" ]; then
JAVA_ARGS=-DskipTests
fi
+if [ "$ARROW_JAVA_SHADE_FLATBUFS" == "1"]; then
+ export SHADE_FLATBUFFERS = " -Pshade-flatbuffers"
+fi
+
pushd $arrow_src/java
-mvn -B $JAVA_ARGS -Drat.skip=true install
+mvn -B $JAVA_ARGS -Drat.skip=true install $SHADE_FLATBUFFERS
if [ "$ARROW_JAVADOC" == "1" ]; then
export MAVEN_OPTS="$MAVEN_OPTS -Dorg.slf4j.simpleLogger.defaultLogLevel=warn"
diff --git a/ci/docker_java_test_all.sh b/ci/docker_java_test_all.sh
index 1466907..ca48894 100755
--- a/ci/docker_java_test_all.sh
+++ b/ci/docker_java_test_all.sh
@@ -28,6 +28,7 @@ export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export ARROW_JAVADOC=1
bash $SOURCE_DIR/docker_build_java.sh
+export ARROW_JAVA_SHADE_FLATBUFS=1
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export ARROW_JAVADOC=0
bash $SOURCE_DIR/docker_build_java.sh
diff --git a/java/README.md b/java/README.md
index 23575bf..34ff51a 100644
--- a/java/README.md
+++ b/java/README.md
@@ -45,6 +45,15 @@ mvn install -P arrow-jni -am -Darrow.cpp.build.dir=../../release
The gandiva library is still in Alpha stages, and subject to API changes without
deprecation warnings.
+## Flatbuffers dependency
+
+Arrow uses Google's Flatbuffers to transport metadata. The java version of the library
+requires the generated flatbuffer classes can only be used with the same version that
+generated them. Arrow packages a verion of the arrow-vector module that shades flatbuffers
+and arrow-format into a single JAR. Using the classifier "shade-format-flatbuffers" in your
+pom.xml will make use of this JAR, you can then exclude/resolve the original dependency to
+a version of your choosing.
+
## Java Code Style Guide
Arrow Java follows the Google style guide [here][3] with the following
diff --git a/java/adapter/jdbc/pom.xml b/java/adapter/jdbc/pom.xml
index 767f3d1..d2a8387 100644
--- a/java/adapter/jdbc/pom.xml
+++ b/java/adapter/jdbc/pom.xml
@@ -38,6 +38,7 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
diff --git a/java/adapter/orc/pom.xml b/java/adapter/orc/pom.xml
index 0ee1cdd..10fb4e1 100644
--- a/java/adapter/orc/pom.xml
+++ b/java/adapter/orc/pom.xml
@@ -24,14 +24,9 @@
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
- <dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-format</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.5.5</version>
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
index c69e74a..9798eca 100644
--- a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.stream.Collectors;
-import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ReadChannel;
@@ -107,10 +106,6 @@ public class OrcStripeReader extends ArrowReader {
throw new IOException("Unexpected end of input. Missing schema.");
}
- if (result.getMessage().headerType() != MessageHeader.Schema) {
- throw new IOException("Expected schema but header was " + result.getMessage().headerType());
- }
-
return MessageSerializer.deserializeSchema(result.getMessage());
}
}
diff --git a/java/algorithm/pom.xml b/java/algorithm/pom.xml
index 90ea24a..779c2ca 100644
--- a/java/algorithm/pom.xml
+++ b/java/algorithm/pom.xml
@@ -24,6 +24,7 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
diff --git a/java/flight/pom.xml b/java/flight/pom.xml
index 6d8cb5c..4fd3261 100644
--- a/java/flight/pom.xml
+++ b/java/flight/pom.xml
@@ -26,13 +26,10 @@
<dependencies>
<dependency>
- <groupId>com.google.flatbuffers</groupId>
- <artifactId>flatbuffers-java</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
@@ -40,11 +37,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.arrow</groupId>
- <artifactId>arrow-format</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${dep.grpc.version}</version>
diff --git a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 6611b3e..f837ec8 100644
--- a/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ b/java/flight/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -26,9 +26,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.arrow.flatbuf.Message;
-import org.apache.arrow.flatbuf.MessageHeader;
-import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.flight.grpc.AddWritableBuffer;
import org.apache.arrow.flight.grpc.GetReadableBuffer;
import org.apache.arrow.flight.impl.Flight.FlightData;
@@ -38,13 +35,13 @@ import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageMetadataResult;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
-import com.google.flatbuffers.FlatBufferBuilder;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
@@ -54,7 +51,6 @@ import io.grpc.Drainable;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.protobuf.ProtoUtils;
-
import io.netty.buffer.ArrowBuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
@@ -120,16 +116,15 @@ class ArrowMessage implements AutoCloseable {
);
private final FlightDescriptor descriptor;
- private final Message message;
+ private final MessageMetadataResult message;
private final ArrowBuf appMetadata;
private final List<ArrowBuf> bufs;
+
public ArrowMessage(FlightDescriptor descriptor, Schema schema) {
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int schemaOffset = schema.getSchema(builder);
- ByteBuffer serializedMessage = MessageSerializer.serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0);
- serializedMessage = serializedMessage.slice();
- message = Message.getRootAsMessage(serializedMessage);
+ ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(schema);
+ this.message = MessageMetadataResult.create(serializedMessage.slice(),
+ serializedMessage.remaining());
bufs = ImmutableList.of();
this.descriptor = descriptor;
this.appMetadata = null;
@@ -141,25 +136,17 @@ class ArrowMessage implements AutoCloseable {
* @param appMetadata The app metadata. May be null. Takes ownership of the buffer otherwise.
*/
public ArrowMessage(ArrowRecordBatch batch, ArrowBuf appMetadata) {
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int batchOffset = batch.writeTo(builder);
- ByteBuffer serializedMessage = MessageSerializer.serializeMessage(builder, MessageHeader.RecordBatch, batchOffset,
- batch.computeBodyLength());
- serializedMessage = serializedMessage.slice();
- this.message = Message.getRootAsMessage(serializedMessage);
+ ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch);
+ this.message = MessageMetadataResult.create(serializedMessage.slice(), serializedMessage.remaining());
this.bufs = ImmutableList.copyOf(batch.getBuffers());
this.descriptor = null;
this.appMetadata = appMetadata;
}
public ArrowMessage(ArrowDictionaryBatch batch) {
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int batchOffset = batch.writeTo(builder);
- ByteBuffer serializedMessage = MessageSerializer
- .serializeMessage(builder, MessageHeader.DictionaryBatch, batchOffset,
- batch.computeBodyLength());
+ ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch);
serializedMessage = serializedMessage.slice();
- this.message = Message.getRootAsMessage(serializedMessage);
+ this.message = MessageMetadataResult.create(serializedMessage, serializedMessage.remaining());
// asInputStream will free the buffers implicitly, so increment the reference count
batch.getDictionary().getBuffers().forEach(buf -> buf.getReferenceManager().retain());
this.bufs = ImmutableList.copyOf(batch.getDictionary().getBuffers());
@@ -167,13 +154,18 @@ class ArrowMessage implements AutoCloseable {
this.appMetadata = null;
}
- private ArrowMessage(FlightDescriptor descriptor, Message message, ArrowBuf appMetadata, ArrowBuf buf) {
+ private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
+ ArrowBuf buf) {
this.message = message;
this.descriptor = descriptor;
this.appMetadata = appMetadata;
this.bufs = buf == null ? ImmutableList.of() : ImmutableList.of(buf);
}
+ public MessageMetadataResult asSchemaMessage() {
+ return message;
+ }
+
public FlightDescriptor getDescriptor() {
return descriptor;
}
@@ -182,27 +174,20 @@ class ArrowMessage implements AutoCloseable {
return HeaderType.getHeader(message.headerType());
}
- public Message asSchemaMessage() {
- return message;
- }
-
public Schema asSchema() {
Preconditions.checkArgument(bufs.size() == 0);
Preconditions.checkArgument(getMessageType() == HeaderType.SCHEMA);
- org.apache.arrow.flatbuf.Schema schema = new org.apache.arrow.flatbuf.Schema();
- message.header(schema);
- return Schema.convertSchema(schema);
+ return MessageSerializer.deserializeSchema(message);
}
public ArrowRecordBatch asRecordBatch() throws IOException {
Preconditions.checkArgument(bufs.size() == 1, "A batch can only be consumed if it contains a single ArrowBuf.");
Preconditions.checkArgument(getMessageType() == HeaderType.RECORD_BATCH);
- RecordBatch recordBatch = new RecordBatch();
- message.header(recordBatch);
+
ArrowBuf underlying = bufs.get(0);
+
underlying.getReferenceManager().retain();
- ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(recordBatch, underlying);
- return batch;
+ return MessageSerializer.deserializeRecordBatch(message, underlying);
}
public ArrowDictionaryBatch asDictionaryBatch() throws IOException {
@@ -220,7 +205,7 @@ class ArrowMessage implements AutoCloseable {
try {
FlightDescriptor descriptor = null;
- Message header = null;
+ MessageMetadataResult header = null;
ArrowBuf body = null;
ArrowBuf appMetadata = null;
while (stream.available() > 0) {
@@ -238,7 +223,7 @@ class ArrowMessage implements AutoCloseable {
int size = readRawVarint32(stream);
byte[] bytes = new byte[size];
ByteStreams.readFully(stream, bytes);
- header = Message.getRootAsMessage(ByteBuffer.wrap(bytes));
+ header = MessageMetadataResult.create(ByteBuffer.wrap(bytes), size);
break;
}
case APP_METADATA_TAG: {
@@ -282,7 +267,8 @@ class ArrowMessage implements AutoCloseable {
private InputStream asInputStream(BufferAllocator allocator) {
try {
- final ByteString bytes = ByteString.copyFrom(message.getByteBuffer(), message.getByteBuffer().remaining());
+ final ByteString bytes = ByteString.copyFrom(message.getMessageBuffer(),
+ message.bytesAfterMessage());
if (getMessageType() == HeaderType.SCHEMA) {
diff --git a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
index c23c793..eed428c 100644
--- a/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
+++ b/java/flight/src/test/java/org/apache/arrow/flight/perf/TestPerf.java
@@ -34,6 +34,7 @@ import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -46,7 +47,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.flatbuffers.FlatBufferBuilder;
import com.google.protobuf.ByteString;
@org.junit.Ignore
@@ -62,13 +62,12 @@ public class TestPerf {
Field.nullable("d", MinorType.BIGINT.getType())
));
- FlatBufferBuilder builder = new FlatBufferBuilder();
- pojoSchema.getSchema(builder);
+ ByteString serializedSchema = ByteString.copyFrom(MessageSerializer.serializeMetadata(pojoSchema));
return FlightDescriptor.command(Perf.newBuilder()
.setRecordsPerStream(recordCount)
.setRecordsPerBatch(recordsPerBatch)
- .setSchema(ByteString.copyFrom(pojoSchema.toByteArray()))
+ .setSchema(serializedSchema)
.setStreamCount(streamCount)
.build()
.toByteArray());
diff --git a/java/gandiva/pom.xml b/java/gandiva/pom.xml
index 02734d6..74feb68 100644
--- a/java/gandiva/pom.xml
+++ b/java/gandiva/pom.xml
@@ -39,6 +39,7 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
diff --git a/java/performance/pom.xml b/java/performance/pom.xml
index 0b42d07..dcc20a6 100644
--- a/java/performance/pom.xml
+++ b/java/performance/pom.xml
@@ -48,6 +48,7 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
diff --git a/java/pom.xml b/java/pom.xml
index 29d602d..617809b 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -38,6 +38,7 @@
<dep.hadoop.version>2.7.1</dep.hadoop.version>
<dep.fbs.version>1.9.0</dep.fbs.version>
<dep.flatc.version>1.9.0</dep.flatc.version>
+ <arrow.vector.classifier></arrow.vector.classifier>
<forkCount>2</forkCount>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
</properties>
@@ -486,10 +487,10 @@
<dependencyManagement>
<dependencies>
- <dependency>
- <groupId>com.google.flatbuffers</groupId>
- <artifactId>flatbuffers-java</artifactId>
- <version>${dep.fbs.version}</version>
+ <dependency>
+ <groupId>com.google.flatbuffers</groupId>
+ <artifactId>flatbuffers-java</artifactId>
+ <version>${dep.fbs.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -679,6 +680,14 @@
<module>gandiva</module>
</modules>
</profile>
+
+ <profile>
+ <!-- Use the version of arrow-vector that shades flatbuffers and packages format -->
+ <id>shade-flatbuffers</id>
+ <properties>
+ <arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
+ </properties>
+ </profile>
</profiles>
</project>
diff --git a/java/tools/pom.xml b/java/tools/pom.xml
index bcdeb11..caa02c8 100644
--- a/java/tools/pom.xml
+++ b/java/tools/pom.xml
@@ -29,6 +29,7 @@
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${project.version}</version>
+ <classifier>${arrow.vector.classifier}</classifier>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
diff --git a/java/vector/pom.xml b/java/vector/pom.xml
index 978411e..74ecf38 100644
--- a/java/vector/pom.xml
+++ b/java/vector/pom.xml
@@ -55,6 +55,7 @@
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
+ <version>${dep.fbs.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -128,6 +129,37 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.arrow:arrow-format</include>
+ <include>com.google.flatbuffers:*</include>
+ </includes>
+ </artifactSet>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>shade-format-flatbuffers</shadedClassifierName>
+ <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <relocations>
+ <relocation>
+ <pattern>com.google.flatbuffers</pattern>
+ <shadedPattern>arrow.vector.com.google.flatbuffers</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<pluginManagement>
<plugins>
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
index a466db8..93efb85 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowDictionaryBatch.java
@@ -18,6 +18,7 @@
package org.apache.arrow.vector.ipc.message;
import org.apache.arrow.flatbuf.DictionaryBatch;
+import org.apache.arrow.flatbuf.MessageHeader;
import com.google.flatbuffers.FlatBufferBuilder;
@@ -35,6 +36,10 @@ public class ArrowDictionaryBatch implements ArrowMessage {
this.dictionary = dictionary;
}
+ public byte getMessageType() {
+ return MessageHeader.DictionaryBatch;
+ }
+
public long getDictionaryId() {
return dictionaryId;
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
index 441f6ec..c928d03 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
@@ -26,6 +26,9 @@ public interface ArrowMessage extends FBSerializable, AutoCloseable {
<T> T accepts(ArrowMessageVisitor<T> visitor);
+ /** Returns the flatbuffer enum value indicating the type of the message. */
+ byte getMessageType();
+
/**
* Visitor interface for implementations of {@link ArrowMessage}.
*
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 185b44e..7631a87 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -106,6 +106,10 @@ public class ArrowRecordBatch implements ArrowMessage {
this.buffersLayout = Collections.unmodifiableList(arrowBuffers);
}
+ public byte getMessageType() {
+ return org.apache.arrow.flatbuf.MessageHeader.RecordBatch;
+ }
+
public int getLength() {
return length;
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
index 212f8b6..e472882 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageMetadataResult.java
@@ -42,6 +42,15 @@ public class MessageMetadataResult {
this.message = message;
}
+ /**
+ * Creates a new {@link MessageMetadataResult} by parsing it from the beginning of the buffer.
+ *
+ * @param messageLength The length of the serialized flatbuffer message in bytes (might not be equal to the buffer
+ * size).
+ */
+ public static MessageMetadataResult create(ByteBuffer buffer, int messageLength) {
+ return new MessageMetadataResult(messageLength, buffer, Message.getRootAsMessage(buffer));
+ }
/**
* Get the length of the message metadata in bytes, not including the body length.
@@ -55,13 +64,24 @@ public class MessageMetadataResult {
/**
* Get the buffer containing the raw message metadata bytes, not including the message body data.
*
- * @return buffer containing the message metadata
+ * @return buffer containing the message metadata.
*/
public ByteBuffer getMessageBuffer() {
return messageBuffer;
}
/**
+ * Returns the bytes remaining in the buffer after parsing the message from it.
+ */
+ public int bytesAfterMessage() {
+ return message.getByteBuffer().remaining();
+ }
+
+ public byte headerType() {
+ return message.headerType();
+ }
+
+ /**
* Check if the message is followed by a body. This will be true if the message has a body
* length > 0, which indicates that a message body needs to be read from the input source.
*
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index 034a34c..4016802 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -30,6 +30,7 @@ import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.MetadataVersion;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -63,9 +64,9 @@ public class MessageSerializer {
*/
public static int bytesToInt(byte[] bytes) {
return ((bytes[3] & 255) << 24) +
- ((bytes[2] & 255) << 16) +
- ((bytes[1] & 255) << 8) +
- ((bytes[0] & 255) << 0);
+ ((bytes[2] & 255) << 16) +
+ ((bytes[1] & 255) << 8) +
+ ((bytes[0] & 255));
}
/**
@@ -78,7 +79,7 @@ public class MessageSerializer {
bytes[3] = (byte) (value >>> 24);
bytes[2] = (byte) (value >>> 16);
bytes[1] = (byte) (value >>> 8);
- bytes[0] = (byte) (value >>> 0);
+ bytes[0] = (byte) (value);
}
/**
@@ -120,9 +121,7 @@ public class MessageSerializer {
long start = out.getCurrentPosition();
assert start % 8 == 0;
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int schemaOffset = schema.getSchema(builder);
- ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.Schema, schemaOffset, 0);
+ ByteBuffer serializedMessage = serializeMetadata(schema);
int messageLength = serializedMessage.remaining();
@@ -132,14 +131,25 @@ public class MessageSerializer {
}
/**
+ * Returns the serialized flatbuffer bytes of the schema wrapped in a message table.
+ */
+ public static ByteBuffer serializeMetadata(Schema schema) {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int schemaOffset = schema.getSchema(builder);
+ return MessageSerializer.serializeMessage(builder, org.apache.arrow.flatbuf.MessageHeader.Schema, schemaOffset, 0);
+ }
+
+ /**
* Deserializes an Arrow Schema object from a schema message. Format is from serialize().
*
* @param schemaMessage a Message of type MessageHeader.Schema
* @return the deserialized Arrow Schema
*/
public static Schema deserializeSchema(Message schemaMessage) {
+ Preconditions.checkArgument(schemaMessage.headerType() == MessageHeader.Schema,
+ "Expected schema but result was: %s", schemaMessage.headerType());
return Schema.convertSchema((org.apache.arrow.flatbuf.Schema)
- schemaMessage.header(new org.apache.arrow.flatbuf.Schema()));
+ schemaMessage.header(new org.apache.arrow.flatbuf.Schema()));
}
/**
@@ -157,8 +167,17 @@ public class MessageSerializer {
if (result.getMessage().headerType() != MessageHeader.Schema) {
throw new IOException("Expected schema but header was " + result.getMessage().headerType());
}
+ return deserializeSchema(result);
+ }
- return deserializeSchema(result.getMessage());
+ /**
+ * Deserializes an Arrow Schema object from a {@link MessageMetadataResult}. Format is from serialize().
+ *
+ * @param message a Message of type MessageHeader.Schema
+ * @return the deserialized Arrow Schema
+ */
+ public static Schema deserializeSchema(MessageMetadataResult message) {
+ return deserializeSchema(message.getMessage());
}
/**
@@ -175,10 +194,7 @@ public class MessageSerializer {
int bodyLength = batch.computeBodyLength();
assert bodyLength % 8 == 0;
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int batchOffset = batch.writeTo(builder);
-
- ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.RecordBatch, batchOffset, bodyLength);
+ ByteBuffer serializedMessage = serializeMetadata(batch);
int metadataLength = serializedMessage.remaining();
@@ -224,7 +240,7 @@ public class MessageSerializer {
out.write(buffer);
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() +
- " != " + startPosition + layout.getSize());
+ " != " + startPosition + layout.getSize());
}
}
out.align();
@@ -232,6 +248,16 @@ public class MessageSerializer {
}
/**
+ * Returns the serialized form of {@link RecordBatch} wrapped in a {@link org.apache.arrow.flatbuf.Message}.
+ */
+ public static ByteBuffer serializeMetadata(ArrowMessage message) {
+ FlatBufferBuilder builder = new FlatBufferBuilder();
+ int batchOffset = message.writeTo(builder);
+ return serializeMessage(builder, message.getMessageType(), batchOffset,
+ message.computeBodyLength());
+ }
+
+ /**
* Deserializes an ArrowRecordBatch from a record batch message and data in an ArrowBuf.
*
* @param recordBatchMessage a Message of type MessageHeader.RecordBatch
@@ -321,7 +347,7 @@ public class MessageSerializer {
if ((int) node.length() != node.length() ||
(int) node.nullCount() != node.nullCount()) {
throw new IOException("Cannot currently deserialize record batches with " +
- "node length larger than Int.MAX_VALUE");
+ "node length larger than Int.MAX_VALUE");
}
nodes.add(new ArrowFieldNode((int) node.length(), (int) node.nullCount()));
}
@@ -341,6 +367,15 @@ public class MessageSerializer {
}
/**
+ * Reads a record batch based on the metadata in serializedMessage and the underlying data buffer.
+ */
+ public static ArrowRecordBatch deserializeRecordBatch(MessageMetadataResult serializedMessage,
+ ArrowBuf underlying) throws
+ IOException {
+ return deserializeRecordBatch(serializedMessage.getMessage(), underlying);
+ }
+
+ /**
* Serializes a dictionary ArrowRecordBatch. Returns the offset and length of the written batch.
*
* @param out where to serialize
@@ -353,10 +388,7 @@ public class MessageSerializer {
int bodyLength = batch.computeBodyLength();
assert bodyLength % 8 == 0;
- FlatBufferBuilder builder = new FlatBufferBuilder();
- int batchOffset = batch.writeTo(builder);
-
- ByteBuffer serializedMessage = serializeMessage(builder, MessageHeader.DictionaryBatch, batchOffset, bodyLength);
+ ByteBuffer serializedMessage = serializeMetadata(batch);
int metadataLength = serializedMessage.remaining();
@@ -397,6 +429,20 @@ public class MessageSerializer {
}
/**
+ * Deserializes an ArrowDictionaryBatch from a dictionary batch Message and data in an ArrowBuf.
+ *
+ * @param message a message of type MessageHeader.DictionaryBatch
+ * @param bodyBuffer Arrow buffer containing the DictionaryBatch data
+ * of type MessageHeader.DictionaryBatch
+ * @return the deserialized ArrowDictionaryBatch
+ * @throws IOException if something went wrong
+ */
+ public static ArrowDictionaryBatch deserializeDictionaryBatch(MessageMetadataResult message, ArrowBuf bodyBuffer)
+ throws IOException {
+ return deserializeDictionaryBatch(message.getMessage(), bodyBuffer);
+ }
+
+ /**
* Deserializes an ArrowDictionaryBatch read from the input channel. This uses the given allocator
* to create an ArrowBuf for the batch body data.
*
@@ -547,7 +593,7 @@ public class MessageSerializer {
ByteBuffer messageBuffer = ByteBuffer.allocate(messageLength);
if (in.readFully(messageBuffer) != messageLength) {
throw new IOException(
- "Unexpected end of stream trying to read message.");
+ "Unexpected end of stream trying to read message.");
}
messageBuffer.rewind();
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
similarity index 63%
copy from java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
copy to java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
index 441f6ec..ee53615 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowMessage.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/message/TestMessageMetadataResult.java
@@ -17,23 +17,20 @@
package org.apache.arrow.vector.ipc.message;
-/**
- * Interface for Arrow IPC messages (https://arrow.apache.org/docs/format/IPC.html).
- */
-public interface ArrowMessage extends FBSerializable, AutoCloseable {
+import static org.junit.Assert.assertEquals;
- int computeBodyLength();
+import java.nio.ByteBuffer;
- <T> T accepts(ArrowMessageVisitor<T> visitor);
+import org.junit.Test;
- /**
- * Visitor interface for implementations of {@link ArrowMessage}.
- *
- * @param <T> The type of value to return after visiting.
- */
- interface ArrowMessageVisitor<T> {
- T visit(ArrowDictionaryBatch message);
+public class TestMessageMetadataResult {
- T visit(ArrowRecordBatch message);
+ @Test
+ public void getMessageLength_returnsConstructValue() {
+ // This API is used by spark.
+ MessageMetadataResult result = new MessageMetadataResult(1, ByteBuffer.allocate(0),
+ new org.apache.arrow.flatbuf.Message());
+ assertEquals(result.getMessageLength(), 1);
}
+
}