You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2021/02/07 23:29:36 UTC
[arrow] branch master updated: ARROW-11081: [Java] Make IPC option
immutable
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6609270 ARROW-11081: [Java] Make IPC option immutable
6609270 is described below
commit 66092708abadf616ff8d8edf099b07e1228cb96e
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Sun Feb 7 15:28:43 2021 -0800
ARROW-11081: [Java] Make IPC option immutable
By making it immutable, the following benefits can be obtained:
1. It makes the code easier to reason about.
2. It allows JIT to make more optimizations.
3. Immutable objects can be shared, so many object allocations can be avoided.
Closes #9053 from liyafan82/fly_1231_opt
Authored-by: liyafan82 <fa...@foxmail.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../main/java/org/apache/arrow/flight/ArrowMessage.java | 14 +++++++-------
.../src/main/java/org/apache/arrow/flight/FlightInfo.java | 4 ++--
.../org/apache/arrow/flight/OutboundStreamListener.java | 4 ++--
.../main/java/org/apache/arrow/flight/SchemaResult.java | 2 +-
.../java/org/apache/arrow/flight/TestBasicOperation.java | 4 ++--
.../java/org/apache/arrow/flight/TestMetadataVersion.java | 7 ++++---
.../org/apache/arrow/vector/ipc/ArrowStreamWriter.java | 2 +-
.../java/org/apache/arrow/vector/ipc/ArrowWriter.java | 2 +-
.../org/apache/arrow/vector/ipc/message/IpcOption.java | 15 +++++++++++++--
.../arrow/vector/ipc/message/MessageSerializer.java | 14 +++++++-------
.../apache/arrow/vector/ipc/MessageSerializerTest.java | 8 ++++----
.../apache/arrow/vector/ipc/TestArrowReaderWriter.java | 6 +++---
.../java/org/apache/arrow/vector/ipc/TestRoundTrip.java | 9 +++------
13 files changed, 50 insertions(+), 41 deletions(-)
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 06d3bd3..9681fa8 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -155,7 +155,7 @@ class ArrowMessage implements AutoCloseable {
}
public ArrowMessage(ArrowDictionaryBatch batch, IpcOption option) {
- this.writeOption = new IpcOption();
+ this.writeOption = option;
ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch, writeOption);
serializedMessage = serializedMessage.slice();
this.message = MessageMetadataResult.create(serializedMessage, serializedMessage.remaining());
@@ -173,7 +173,7 @@ class ArrowMessage implements AutoCloseable {
*/
public ArrowMessage(ArrowBuf appMetadata) {
// No need to take IpcOption as it's not used to serialize this kind of message.
- this.writeOption = new IpcOption();
+ this.writeOption = IpcOption.DEFAULT;
this.message = null;
this.bufs = ImmutableList.of();
this.descriptor = null;
@@ -183,7 +183,7 @@ class ArrowMessage implements AutoCloseable {
public ArrowMessage(FlightDescriptor descriptor) {
// No need to take IpcOption as it's not used to serialize this kind of message.
- this.writeOption = new IpcOption();
+ this.writeOption = IpcOption.DEFAULT;
this.message = null;
this.bufs = ImmutableList.of();
this.descriptor = descriptor;
@@ -194,10 +194,10 @@ class ArrowMessage implements AutoCloseable {
private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
ArrowBuf buf) {
// No need to take IpcOption as this is used for deserialized ArrowMessage coming from the wire.
- this.writeOption = new IpcOption();
- if (message != null) {
- this.writeOption.metadataVersion = MetadataVersion.fromFlatbufID(message.getMessage().version());
- }
+ this.writeOption = message != null ?
+ // avoid writing legacy ipc format by default
+ new IpcOption(false, MetadataVersion.fromFlatbufID(message.getMessage().version())) :
+ IpcOption.DEFAULT;
this.message = message;
this.descriptor = descriptor;
this.appMetadata = appMetadata;
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
index 8eb456b..e57b311 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
@@ -61,7 +61,7 @@ public class FlightInfo {
*/
public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
long records) {
- this(schema, descriptor, endpoints, bytes, records, new IpcOption());
+ this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT);
}
/**
@@ -108,7 +108,7 @@ public class FlightInfo {
}
bytes = pbFlightInfo.getTotalBytes();
records = pbFlightInfo.getTotalRecords();
- option = new IpcOption();
+ option = IpcOption.DEFAULT;
}
public Schema getSchema() {
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
index f578d67..8417874 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
@@ -55,7 +55,7 @@ public interface OutboundStreamListener {
* <p>This method must be called before all others, except {@link #putMetadata(ArrowBuf)}.
*/
default void start(VectorSchemaRoot root) {
- start(root, null, new IpcOption());
+ start(root, null, IpcOption.DEFAULT);
}
/**
@@ -64,7 +64,7 @@ public interface OutboundStreamListener {
* <p>This method must be called before all others, except {@link #putMetadata(ArrowBuf)}.
*/
default void start(VectorSchemaRoot root, DictionaryProvider dictionaries) {
- start(root, dictionaries, new IpcOption());
+ start(root, dictionaries, IpcOption.DEFAULT);
}
/**
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
index 035ea7a..8a5e7d9 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
@@ -45,7 +45,7 @@ public class SchemaResult {
private final IpcOption option;
public SchemaResult(Schema schema) {
- this(schema, new IpcOption());
+ this(schema, IpcOption.DEFAULT);
}
/**
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index daf911d..0d8ff33 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -354,7 +354,7 @@ public class TestBasicOperation {
final VectorUnloader unloader = new VectorUnloader(root);
root.setRowCount(0);
final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
- try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) {
+ try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, IpcOption.DEFAULT)) {
Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType());
// Should have at least one empty body buffer (there may be multiple for e.g. data and validity)
Iterator<ArrowBuf> iterator = message.getBufs().iterator();
@@ -388,7 +388,7 @@ public class TestBasicOperation {
try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
Flight.FlightDescriptor descriptor = FlightDescriptor.command(new byte[0]).toProtocol();
- try (final ArrowMessage message = new ArrowMessage(descriptor, schema, new IpcOption())) {
+ try (final ArrowMessage message = new ArrowMessage(descriptor, schema, IpcOption.DEFAULT)) {
Assert.assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType());
// Should have no body buffers
Assert.assertFalse(message.getBufs().iterator().hasNext());
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
index 148b687..83a694b 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
@@ -56,9 +56,10 @@ public class TestMetadataVersion {
schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true))));
unionSchema = new Schema(
Collections.singletonList(Field.nullable("union", new ArrowType.Union(UnionMode.Dense, new int[]{0}))));
- optionV4 = new IpcOption();
- optionV4.metadataVersion = MetadataVersion.V4;
- optionV5 = new IpcOption();
+
+ // avoid writing legacy ipc format by default
+ optionV4 = new IpcOption(false, MetadataVersion.V4);
+ optionV5 = IpcOption.DEFAULT;
}
@AfterClass
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index ff0dfd3..deb9858 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -48,7 +48,7 @@ public class ArrowStreamWriter extends ArrowWriter {
* Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
*/
public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
- this(root, provider, out, new IpcOption());
+ this(root, provider, out, IpcOption.DEFAULT);
}
/**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 3f928d7..7bc9a30 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -65,7 +65,7 @@ public abstract class ArrowWriter implements AutoCloseable {
protected IpcOption option;
protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
- this (root, provider, out, new IpcOption());
+ this (root, provider, out, IpcOption.DEFAULT);
}
/**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
index b93c3b3..5120758 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
@@ -26,8 +26,19 @@ public class IpcOption {
// Write the pre-0.15.0 encapsulated IPC message format
// consisting of a 4-byte prefix instead of 8 byte
- public boolean write_legacy_ipc_format = false;
+ public final boolean write_legacy_ipc_format;
// The metadata version. Defaults to V5.
- public MetadataVersion metadataVersion = MetadataVersion.DEFAULT;
+ public final MetadataVersion metadataVersion;
+
+ public IpcOption() {
+ this(false, MetadataVersion.DEFAULT);
+ }
+
+ public IpcOption(boolean writeLegacyIpcFormat, MetadataVersion metadataVersion) {
+ this.write_legacy_ipc_format = writeLegacyIpcFormat;
+ this.metadataVersion = metadataVersion;
+ }
+
+ public static final IpcOption DEFAULT = new IpcOption();
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index b2d1425..5d332eb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -106,7 +106,7 @@ public class MessageSerializer {
public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
throws IOException {
- return writeMessageBuffer(out, messageLength, messageBuffer, new IpcOption());
+ return writeMessageBuffer(out, messageLength, messageBuffer, IpcOption.DEFAULT);
}
/**
@@ -147,7 +147,7 @@ public class MessageSerializer {
* Serialize a schema object.
*/
public static long serialize(WriteChannel out, Schema schema) throws IOException {
- return serialize(out, schema, new IpcOption());
+ return serialize(out, schema, IpcOption.DEFAULT);
}
/**
@@ -176,7 +176,7 @@ public class MessageSerializer {
*/
@Deprecated
public static ByteBuffer serializeMetadata(Schema schema) {
- return serializeMetadata(schema, new IpcOption());
+ return serializeMetadata(schema, IpcOption.DEFAULT);
}
/**
@@ -234,7 +234,7 @@ public class MessageSerializer {
* Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
*/
public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
- return serialize(out, batch, new IpcOption());
+ return serialize(out, batch, IpcOption.DEFAULT);
}
/**
@@ -315,7 +315,7 @@ public class MessageSerializer {
*/
@Deprecated
public static ByteBuffer serializeMetadata(ArrowMessage message) {
- return serializeMetadata(message, new IpcOption());
+ return serializeMetadata(message, IpcOption.DEFAULT);
}
/**
@@ -450,7 +450,7 @@ public class MessageSerializer {
}
public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
- return serialize(out, batch, new IpcOption());
+ return serialize(out, batch, IpcOption.DEFAULT);
}
/**
@@ -638,7 +638,7 @@ public class MessageSerializer {
byte headerType,
int headerOffset,
long bodyLength) {
- return serializeMessage(builder, headerType, headerOffset, bodyLength, new IpcOption());
+ return serializeMessage(builder, headerType, headerOffset, bodyLength, IpcOption.DEFAULT);
}
/**
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index d4e97c2..ae18fab 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -168,8 +168,8 @@ public class MessageSerializerTest {
ArrowRecordBatch batch = new ArrowRecordBatch(
16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
- IpcOption option = new IpcOption();
- option.metadataVersion = MetadataVersion.V4;
+ // avoid writing legacy ipc format by default
+ IpcOption option = new IpcOption(false, MetadataVersion.V4);
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch, option);
@@ -193,8 +193,8 @@ public class MessageSerializerTest {
ArrowRecordBatch batch = new ArrowRecordBatch(
16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
- IpcOption option = new IpcOption();
- option.metadataVersion = MetadataVersion.V5;
+ // avoid writing legacy ipc format by default
+ IpcOption option = new IpcOption(false, MetadataVersion.V5);
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch, option);
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 84e26a3..47dd844 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -77,6 +77,7 @@ import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.MetadataVersion;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -780,8 +781,7 @@ public class TestArrowReaderWriter {
WriteChannel out = new WriteChannel(newChannel(outStream));
// write legacy ipc format
- IpcOption option = new IpcOption();
- option.write_legacy_ipc_format = true;
+ IpcOption option = new IpcOption(true, MetadataVersion.DEFAULT);
MessageSerializer.serialize(out, schema, option);
MessageSerializer.serialize(out, batch);
@@ -794,7 +794,7 @@ public class TestArrowReaderWriter {
readBatch.close();
// write ipc format with continuation
- option.write_legacy_ipc_format = false;
+ option = IpcOption.DEFAULT;
MessageSerializer.serialize(out, schema, option);
MessageSerializer.serialize(out, batch);
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
index 971008e..5f57e90 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
@@ -91,15 +91,12 @@ public class TestRoundTrip extends BaseFileTest {
@Parameterized.Parameters(name = "options = {0}")
public static Collection<Object[]> getWriteOption() {
- final IpcOption legacy = new IpcOption();
- legacy.metadataVersion = MetadataVersion.V4;
- legacy.write_legacy_ipc_format = true;
- final IpcOption version4 = new IpcOption();
- version4.metadataVersion = MetadataVersion.V4;
+ final IpcOption legacy = new IpcOption(true, MetadataVersion.V4);
+ final IpcOption version4 = new IpcOption(false, MetadataVersion.V4);
return Arrays.asList(
new Object[] {"V4Legacy", legacy},
new Object[] {"V4", version4},
- new Object[] {"V5", new IpcOption()}
+ new Object[] {"V5", IpcOption.DEFAULT}
);
}