You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2021/02/07 23:29:36 UTC

[arrow] branch master updated: ARROW-11081: [Java] Make IPC option immutable

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6609270  ARROW-11081: [Java] Make IPC option immutable
6609270 is described below

commit 66092708abadf616ff8d8edf099b07e1228cb96e
Author: liyafan82 <fa...@foxmail.com>
AuthorDate: Sun Feb 7 15:28:43 2021 -0800

    ARROW-11081: [Java] Make IPC option immutable
    
    By making it immutable, the following benefits can be obtained:
    
    1. It makes the code easier to reason about.
    2. It allows JIT to make more optimizations.
    3. Immutable objects can be shared, so many object allocations can be avoided.
    
    Closes #9053 from liyafan82/fly_1231_opt
    
    Authored-by: liyafan82 <fa...@foxmail.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../main/java/org/apache/arrow/flight/ArrowMessage.java   | 14 +++++++-------
 .../src/main/java/org/apache/arrow/flight/FlightInfo.java |  4 ++--
 .../org/apache/arrow/flight/OutboundStreamListener.java   |  4 ++--
 .../main/java/org/apache/arrow/flight/SchemaResult.java   |  2 +-
 .../java/org/apache/arrow/flight/TestBasicOperation.java  |  4 ++--
 .../java/org/apache/arrow/flight/TestMetadataVersion.java |  7 ++++---
 .../org/apache/arrow/vector/ipc/ArrowStreamWriter.java    |  2 +-
 .../java/org/apache/arrow/vector/ipc/ArrowWriter.java     |  2 +-
 .../org/apache/arrow/vector/ipc/message/IpcOption.java    | 15 +++++++++++++--
 .../arrow/vector/ipc/message/MessageSerializer.java       | 14 +++++++-------
 .../apache/arrow/vector/ipc/MessageSerializerTest.java    |  8 ++++----
 .../apache/arrow/vector/ipc/TestArrowReaderWriter.java    |  6 +++---
 .../java/org/apache/arrow/vector/ipc/TestRoundTrip.java   |  9 +++------
 13 files changed, 50 insertions(+), 41 deletions(-)

diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index 06d3bd3..9681fa8 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -155,7 +155,7 @@ class ArrowMessage implements AutoCloseable {
   }
 
   public ArrowMessage(ArrowDictionaryBatch batch, IpcOption option) {
-    this.writeOption = new IpcOption();
+    this.writeOption = option;
     ByteBuffer serializedMessage = MessageSerializer.serializeMetadata(batch, writeOption);
     serializedMessage = serializedMessage.slice();
     this.message = MessageMetadataResult.create(serializedMessage, serializedMessage.remaining());
@@ -173,7 +173,7 @@ class ArrowMessage implements AutoCloseable {
    */
   public ArrowMessage(ArrowBuf appMetadata) {
     // No need to take IpcOption as it's not used to serialize this kind of message.
-    this.writeOption = new IpcOption();
+    this.writeOption = IpcOption.DEFAULT;
     this.message = null;
     this.bufs = ImmutableList.of();
     this.descriptor = null;
@@ -183,7 +183,7 @@ class ArrowMessage implements AutoCloseable {
 
   public ArrowMessage(FlightDescriptor descriptor) {
     // No need to take IpcOption as it's not used to serialize this kind of message.
-    this.writeOption = new IpcOption();
+    this.writeOption = IpcOption.DEFAULT;
     this.message = null;
     this.bufs = ImmutableList.of();
     this.descriptor = descriptor;
@@ -194,10 +194,10 @@ class ArrowMessage implements AutoCloseable {
   private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
                        ArrowBuf buf) {
     // No need to take IpcOption as this is used for deserialized ArrowMessage coming from the wire.
-    this.writeOption = new IpcOption();
-    if (message != null) {
-      this.writeOption.metadataVersion = MetadataVersion.fromFlatbufID(message.getMessage().version());
-    }
+    this.writeOption = message != null ?
+        // avoid writing legacy ipc format by default
+        new IpcOption(false, MetadataVersion.fromFlatbufID(message.getMessage().version())) :
+        IpcOption.DEFAULT;
     this.message = message;
     this.descriptor = descriptor;
     this.appMetadata = appMetadata;
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
index 8eb456b..e57b311 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
@@ -61,7 +61,7 @@ public class FlightInfo {
    */
   public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
       long records) {
-    this(schema, descriptor, endpoints, bytes, records, new IpcOption());
+    this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT);
   }
 
   /**
@@ -108,7 +108,7 @@ public class FlightInfo {
     }
     bytes = pbFlightInfo.getTotalBytes();
     records = pbFlightInfo.getTotalRecords();
-    option = new IpcOption();
+    option = IpcOption.DEFAULT;
   }
 
   public Schema getSchema() {
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
index f578d67..8417874 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
@@ -55,7 +55,7 @@ public interface OutboundStreamListener {
    * <p>This method must be called before all others, except {@link #putMetadata(ArrowBuf)}.
    */
   default void start(VectorSchemaRoot root) {
-    start(root, null, new IpcOption());
+    start(root, null, IpcOption.DEFAULT);
   }
 
   /**
@@ -64,7 +64,7 @@ public interface OutboundStreamListener {
    * <p>This method must be called before all others, except {@link #putMetadata(ArrowBuf)}.
    */
   default void start(VectorSchemaRoot root, DictionaryProvider dictionaries) {
-    start(root, dictionaries, new IpcOption());
+    start(root, dictionaries, IpcOption.DEFAULT);
   }
 
   /**
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
index 035ea7a..8a5e7d9 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/SchemaResult.java
@@ -45,7 +45,7 @@ public class SchemaResult {
   private final IpcOption option;
 
   public SchemaResult(Schema schema) {
-    this(schema, new IpcOption());
+    this(schema, IpcOption.DEFAULT);
   }
 
   /**
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index daf911d..0d8ff33 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -354,7 +354,7 @@ public class TestBasicOperation {
       final VectorUnloader unloader = new VectorUnloader(root);
       root.setRowCount(0);
       final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
-      try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, new IpcOption())) {
+      try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), null, IpcOption.DEFAULT)) {
         Assert.assertEquals(ArrowMessage.HeaderType.RECORD_BATCH, message.getMessageType());
         // Should have at least one empty body buffer (there may be multiple for e.g. data and validity)
         Iterator<ArrowBuf> iterator = message.getBufs().iterator();
@@ -388,7 +388,7 @@ public class TestBasicOperation {
     try (final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
       final MethodDescriptor.Marshaller<ArrowMessage> marshaller = ArrowMessage.createMarshaller(allocator);
       Flight.FlightDescriptor descriptor = FlightDescriptor.command(new byte[0]).toProtocol();
-      try (final ArrowMessage message = new ArrowMessage(descriptor, schema, new IpcOption())) {
+      try (final ArrowMessage message = new ArrowMessage(descriptor, schema, IpcOption.DEFAULT)) {
         Assert.assertEquals(ArrowMessage.HeaderType.SCHEMA, message.getMessageType());
         // Should have no body buffers
         Assert.assertFalse(message.getBufs().iterator().hasNext());
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
index 148b687..83a694b 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestMetadataVersion.java
@@ -56,9 +56,10 @@ public class TestMetadataVersion {
     schema = new Schema(Collections.singletonList(Field.nullable("foo", new ArrowType.Int(32, true))));
     unionSchema = new Schema(
         Collections.singletonList(Field.nullable("union", new ArrowType.Union(UnionMode.Dense, new int[]{0}))));
-    optionV4 = new IpcOption();
-    optionV4.metadataVersion = MetadataVersion.V4;
-    optionV5 = new IpcOption();
+
+    // avoid writing legacy ipc format by default
+    optionV4 = new IpcOption(false, MetadataVersion.V4);
+    optionV5 = IpcOption.DEFAULT;
   }
 
   @AfterClass
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index ff0dfd3..deb9858 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -48,7 +48,7 @@ public class ArrowStreamWriter extends ArrowWriter {
    * Construct an ArrowStreamWriter with an optional DictionaryProvider for the WritableByteChannel.
    */
   public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
-    this(root, provider, out, new IpcOption());
+    this(root, provider, out, IpcOption.DEFAULT);
   }
 
   /**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 3f928d7..7bc9a30 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -65,7 +65,7 @@ public abstract class ArrowWriter implements AutoCloseable {
   protected IpcOption option;
 
   protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
-    this (root, provider, out, new IpcOption());
+    this (root, provider, out, IpcOption.DEFAULT);
   }
 
   /**
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
index b93c3b3..5120758 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/IpcOption.java
@@ -26,8 +26,19 @@ public class IpcOption {
 
   // Write the pre-0.15.0 encapsulated IPC message format
   // consisting of a 4-byte prefix instead of 8 byte
-  public boolean write_legacy_ipc_format = false;
+  public final boolean write_legacy_ipc_format;
 
   // The metadata version. Defaults to V5.
-  public MetadataVersion metadataVersion = MetadataVersion.DEFAULT;
+  public final MetadataVersion metadataVersion;
+
+  public IpcOption() {
+    this(false, MetadataVersion.DEFAULT);
+  }
+
+  public IpcOption(boolean writeLegacyIpcFormat, MetadataVersion metadataVersion) {
+    this.write_legacy_ipc_format = writeLegacyIpcFormat;
+    this.metadataVersion = metadataVersion;
+  }
+
+  public static final IpcOption DEFAULT = new IpcOption();
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
index b2d1425..5d332eb 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageSerializer.java
@@ -106,7 +106,7 @@ public class MessageSerializer {
 
   public static int writeMessageBuffer(WriteChannel out, int messageLength, ByteBuffer messageBuffer)
       throws IOException {
-    return writeMessageBuffer(out, messageLength, messageBuffer, new IpcOption());
+    return writeMessageBuffer(out, messageLength, messageBuffer, IpcOption.DEFAULT);
   }
 
   /**
@@ -147,7 +147,7 @@ public class MessageSerializer {
    * Serialize a schema object.
    */
   public static long serialize(WriteChannel out, Schema schema) throws IOException {
-    return serialize(out, schema, new IpcOption());
+    return serialize(out, schema, IpcOption.DEFAULT);
   }
 
   /**
@@ -176,7 +176,7 @@ public class MessageSerializer {
    */
   @Deprecated
   public static ByteBuffer serializeMetadata(Schema schema) {
-    return serializeMetadata(schema, new IpcOption());
+    return serializeMetadata(schema, IpcOption.DEFAULT);
   }
 
   /**
@@ -234,7 +234,7 @@ public class MessageSerializer {
    * Serializes an ArrowRecordBatch. Returns the offset and length of the written batch.
    */
   public static ArrowBlock serialize(WriteChannel out, ArrowRecordBatch batch) throws IOException {
-    return serialize(out, batch, new IpcOption());
+    return serialize(out, batch, IpcOption.DEFAULT);
   }
 
   /**
@@ -315,7 +315,7 @@ public class MessageSerializer {
    */
   @Deprecated
   public static ByteBuffer serializeMetadata(ArrowMessage message) {
-    return serializeMetadata(message, new IpcOption());
+    return serializeMetadata(message, IpcOption.DEFAULT);
   }
 
   /**
@@ -450,7 +450,7 @@ public class MessageSerializer {
   }
 
   public static ArrowBlock serialize(WriteChannel out, ArrowDictionaryBatch batch) throws IOException {
-    return serialize(out, batch, new IpcOption());
+    return serialize(out, batch, IpcOption.DEFAULT);
   }
 
   /**
@@ -638,7 +638,7 @@ public class MessageSerializer {
       byte headerType,
       int headerOffset,
       long bodyLength) {
-    return serializeMessage(builder, headerType, headerOffset, bodyLength, new IpcOption());
+    return serializeMessage(builder, headerType, headerOffset, bodyLength, IpcOption.DEFAULT);
   }
 
   /**
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
index d4e97c2..ae18fab 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/MessageSerializerTest.java
@@ -168,8 +168,8 @@ public class MessageSerializerTest {
     ArrowRecordBatch batch = new ArrowRecordBatch(
         16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
 
-    IpcOption option = new IpcOption();
-    option.metadataVersion = MetadataVersion.V4;
+    // avoid writing legacy ipc format by default
+    IpcOption option = new IpcOption(false, MetadataVersion.V4);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch, option);
 
@@ -193,8 +193,8 @@ public class MessageSerializerTest {
     ArrowRecordBatch batch = new ArrowRecordBatch(
         16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb));
 
-    IpcOption option = new IpcOption();
-    option.metadataVersion = MetadataVersion.V5;
+    // avoid writing legacy ipc format by default
+    IpcOption option = new IpcOption(false, MetadataVersion.V5);
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), batch, option);
 
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
index 84e26a3..47dd844 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestArrowReaderWriter.java
@@ -77,6 +77,7 @@ import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.MetadataVersion;
 import org.apache.arrow.vector.types.Types.MinorType;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
@@ -780,8 +781,7 @@ public class TestArrowReaderWriter {
     WriteChannel out = new WriteChannel(newChannel(outStream));
 
     // write legacy ipc format
-    IpcOption option = new IpcOption();
-    option.write_legacy_ipc_format = true;
+    IpcOption option = new IpcOption(true, MetadataVersion.DEFAULT);
     MessageSerializer.serialize(out, schema, option);
     MessageSerializer.serialize(out, batch);
 
@@ -794,7 +794,7 @@ public class TestArrowReaderWriter {
     readBatch.close();
 
     // write ipc format with continuation
-    option.write_legacy_ipc_format = false;
+    option = IpcOption.DEFAULT;
     MessageSerializer.serialize(out, schema, option);
     MessageSerializer.serialize(out, batch);
 
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
index 971008e..5f57e90 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/TestRoundTrip.java
@@ -91,15 +91,12 @@ public class TestRoundTrip extends BaseFileTest {
 
   @Parameterized.Parameters(name = "options = {0}")
   public static Collection<Object[]> getWriteOption() {
-    final IpcOption legacy = new IpcOption();
-    legacy.metadataVersion = MetadataVersion.V4;
-    legacy.write_legacy_ipc_format = true;
-    final IpcOption version4 = new IpcOption();
-    version4.metadataVersion = MetadataVersion.V4;
+    final IpcOption legacy = new IpcOption(true, MetadataVersion.V4);
+    final IpcOption version4 = new IpcOption(false, MetadataVersion.V4);
     return Arrays.asList(
         new Object[] {"V4Legacy", legacy},
         new Object[] {"V4", version4},
-        new Object[] {"V5", new IpcOption()}
+        new Object[] {"V5", IpcOption.DEFAULT}
     );
   }