You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/19 13:45:25 UTC

[arrow] branch master updated: GH-15203: [Java] Implement writing compressed files (#15223)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c698fb3c2 GH-15203: [Java] Implement writing compressed files (#15223)
4c698fb3c2 is described below

commit 4c698fb3c2a2b4ee046c6ad6e992e81ed90c7b0e
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 19 08:45:16 2023 -0500

    GH-15203: [Java] Implement writing compressed files (#15223)
    
    
    * Closes: #15203
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 .../arrow/compression/TestCompressionCodec.java    | 209 ++++++++++++++++-----
 .../org/apache/arrow/vector/VectorUnloader.java    |  11 +-
 .../compression/AbstractCompressionCodec.java      |   1 +
 .../apache/arrow/vector/ipc/ArrowFileWriter.java   |   9 +
 .../apache/arrow/vector/ipc/ArrowStreamWriter.java |  19 ++
 .../org/apache/arrow/vector/ipc/ArrowWriter.java   |  25 ++-
 .../arrow/vector/ipc/message/ArrowRecordBatch.java |  22 ++-
 .../apache/arrow/vector/TestVectorUnloadLoad.java  |  11 ++
 8 files changed, 255 insertions(+), 52 deletions(-)

diff --git a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
index 1f6d64d476..a1d5000daa 100644
--- a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
+++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
@@ -19,11 +19,21 @@ package org.apache.arrow.compression;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
 
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
@@ -32,63 +42,62 @@ import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.compression.CompressionCodec;
 import org.apache.arrow.vector.compression.CompressionUtil;
 import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Test cases for {@link CompressionCodec}s.
  */
-@RunWith(Parameterized.class)
-public class TestCompressionCodec {
-
-  private final CompressionCodec codec;
-
+class TestCompressionCodec {
   private BufferAllocator allocator;
 
-  private final int vectorLength;
-
-  @Before
-  public void init() {
+  @BeforeEach
+  void init() {
     allocator = new RootAllocator(Integer.MAX_VALUE);
   }
 
-  @After
-  public void terminate() {
+  @AfterEach
+  void terminate() {
     allocator.close();
   }
 
-  public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) {
-    this.codec = codec;
-    this.vectorLength = vectorLength;
-  }
-
-  @Parameterized.Parameters(name = "codec = {0}, length = {1}")
-  public static Collection<Object[]> getCodecs() {
-    List<Object[]> params = new ArrayList<>();
+  static Collection<Arguments> codecs() {
+    List<Arguments> params = new ArrayList<>();
 
     int[] lengths = new int[] {10, 100, 1000};
     for (int len : lengths) {
       CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE;
-      params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec});
+      params.add(Arguments.arguments(len, dumbCodec));
 
       CompressionCodec lz4Codec = new Lz4CompressionCodec();
-      params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec});
+      params.add(Arguments.arguments(len, lz4Codec));
 
       CompressionCodec zstdCodec = new ZstdCompressionCodec();
-      params.add(new Object[]{zstdCodec.getCodecType(), len, zstdCodec});
-
+      params.add(Arguments.arguments(len, zstdCodec));
     }
     return params;
   }
 
-  private List<ArrowBuf> compressBuffers(List<ArrowBuf> inputBuffers) {
+  private List<ArrowBuf> compressBuffers(CompressionCodec codec, List<ArrowBuf> inputBuffers) {
     List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
     for (ArrowBuf buf : inputBuffers) {
       outputBuffers.add(codec.compress(allocator, buf));
@@ -96,7 +105,7 @@ public class TestCompressionCodec {
     return outputBuffers;
   }
 
-  private List<ArrowBuf> deCompressBuffers(List<ArrowBuf> inputBuffers) {
+  private List<ArrowBuf> deCompressBuffers(CompressionCodec codec, List<ArrowBuf> inputBuffers) {
     List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
     for (ArrowBuf buf : inputBuffers) {
       outputBuffers.add(codec.decompress(allocator, buf));
@@ -104,8 +113,9 @@ public class TestCompressionCodec {
     return outputBuffers;
   }
 
-  @Test
-  public void testCompressFixedWidthBuffers() throws Exception {
+  @ParameterizedTest
+  @MethodSource("codecs")
+  void testCompressFixedWidthBuffers(int vectorLength, CompressionCodec codec) throws Exception {
     // prepare vector to compress
     IntVector origVec = new IntVector("vec", allocator);
     origVec.allocateNew(vectorLength);
@@ -121,8 +131,8 @@ public class TestCompressionCodec {
 
     // compress & decompress
     List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
-    List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
-    List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+    List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+    List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
 
     assertEquals(2, decompressedBuffers.size());
 
@@ -144,8 +154,9 @@ public class TestCompressionCodec {
     AutoCloseables.close(decompressedBuffers);
   }
 
-  @Test
-  public void testCompressVariableWidthBuffers() throws Exception {
+  @ParameterizedTest
+  @MethodSource("codecs")
+  void testCompressVariableWidthBuffers(int vectorLength, CompressionCodec codec) throws Exception {
     // prepare vector to compress
     VarCharVector origVec = new VarCharVector("vec", allocator);
     origVec.allocateNew();
@@ -161,8 +172,8 @@ public class TestCompressionCodec {
 
     // compress & decompress
     List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
-    List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
-    List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+    List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+    List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
 
     assertEquals(3, decompressedBuffers.size());
 
@@ -184,8 +195,9 @@ public class TestCompressionCodec {
     AutoCloseables.close(decompressedBuffers);
   }
 
-  @Test
-  public void testEmptyBuffer() throws Exception {
+  @ParameterizedTest
+  @MethodSource("codecs")
+  void testEmptyBuffer(int vectorLength, CompressionCodec codec) throws Exception {
     final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
 
     origVec.allocateNew(vectorLength);
@@ -194,8 +206,8 @@ public class TestCompressionCodec {
     origVec.setValueCount(vectorLength);
 
     final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
-    final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
-    final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+    final List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+    final List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
 
     // orchestrate new vector
     VarBinaryVector newVec = new VarBinaryVector("new vec", allocator);
@@ -210,4 +222,117 @@ public class TestCompressionCodec {
     newVec.close();
     AutoCloseables.close(decompressedBuffers);
   }
+
+  private static Stream<CompressionUtil.CodecType> codecTypes() {
+    return Arrays.stream(CompressionUtil.CodecType.values());
+  }
+
+  @ParameterizedTest
+  @MethodSource("codecTypes")
+  void testReadWriteStream(CompressionUtil.CodecType codec) throws Exception {
+    withRoot(codec, (factory, root) -> {
+      ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+      try (final ArrowStreamWriter writer = new ArrowStreamWriter(
+          root, new DictionaryProvider.MapDictionaryProvider(),
+          Channels.newChannel(compressedStream),
+          IpcOption.DEFAULT, factory, codec)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      try (ArrowStreamReader reader = new ArrowStreamReader(
+          new ByteArrayReadableSeekableByteChannel(compressedStream.toByteArray()), allocator, factory)) {
+        assertTrue(reader.loadNextBatch());
+        assertTrue(root.equals(reader.getVectorSchemaRoot()));
+        assertFalse(reader.loadNextBatch());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @ParameterizedTest
+  @MethodSource("codecTypes")
+  void testReadWriteFile(CompressionUtil.CodecType codec) throws Exception {
+    withRoot(codec, (factory, root) -> {
+      ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+      try (final ArrowFileWriter writer = new ArrowFileWriter(
+          root, new DictionaryProvider.MapDictionaryProvider(),
+          Channels.newChannel(compressedStream),
+          new HashMap<>(), IpcOption.DEFAULT, factory, codec)) {
+        writer.start();
+        writer.writeBatch();
+        writer.end();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      try (ArrowFileReader reader = new ArrowFileReader(
+          new ByteArrayReadableSeekableByteChannel(compressedStream.toByteArray()), allocator, factory)) {
+        assertTrue(reader.loadNextBatch());
+        assertTrue(root.equals(reader.getVectorSchemaRoot()));
+        assertFalse(reader.loadNextBatch());
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /** Unloading a vector should not free source buffers. */
+  @ParameterizedTest
+  @MethodSource("codecTypes")
+  void testUnloadCompressed(CompressionUtil.CodecType codec) {
+    withRoot(codec, (factory, root) -> {
+      root.getFieldVectors().forEach((vector) -> {
+        Arrays.stream(vector.getBuffers(/*clear*/ false)).forEach((buf) -> {
+          assertNotEquals(0, buf.getReferenceManager().getRefCount());
+        });
+      });
+
+      final VectorUnloader unloader = new VectorUnloader(
+          root, /*includeNullCount*/ true, factory.createCodec(codec), /*alignBuffers*/ true);
+      unloader.getRecordBatch().close();
+
+      root.getFieldVectors().forEach((vector) -> {
+        Arrays.stream(vector.getBuffers(/*clear*/ false)).forEach((buf) -> {
+          assertNotEquals(0, buf.getReferenceManager().getRefCount());
+        });
+      });
+    });
+  }
+
+  void withRoot(CompressionUtil.CodecType codec, BiConsumer<CompressionCodec.Factory, VectorSchemaRoot> testBody) {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ints", new ArrowType.Int(32, true)),
+        Field.nullable("strings", ArrowType.Utf8.INSTANCE)));
+    CompressionCodec.Factory factory = codec == CompressionUtil.CodecType.NO_COMPRESSION ?
+        NoCompressionCodec.Factory.INSTANCE : CommonsCompressionFactory.INSTANCE;
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final IntVector ints = (IntVector) root.getVector(0);
+      final VarCharVector strings = (VarCharVector) root.getVector(1);
+      // Doesn't get compresed
+      ints.setSafe(0, 0x4a3e);
+      ints.setSafe(1, 0x8aba);
+      ints.setSafe(2, 0x4362);
+      ints.setSafe(3, 0x383f);
+      // Gets compressed
+      String compressibleString = "                "; // 16 bytes
+      compressibleString = compressibleString + compressibleString;
+      compressibleString = compressibleString + compressibleString;
+      compressibleString = compressibleString + compressibleString;
+      compressibleString = compressibleString + compressibleString;
+      compressibleString = compressibleString + compressibleString; // 512 bytes
+      byte[] compressibleData = compressibleString.getBytes(StandardCharsets.UTF_8);
+      strings.setSafe(0, compressibleData);
+      strings.setSafe(1, compressibleData);
+      strings.setSafe(2, compressibleData);
+      strings.setSafe(3, compressibleData);
+      root.setRowCount(4);
+
+      testBody.accept(factory, root);
+    }
+  }
 }
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index e2cbf3ec1d..1d44e37ac7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -69,7 +69,7 @@ public class VectorUnloader {
       VectorSchemaRoot root, boolean includeNullCount, CompressionCodec codec, boolean alignBuffers) {
     this.root = root;
     this.includeNullCount = includeNullCount;
-    this.codec = codec;
+    this.codec = codec == null ? NoCompressionCodec.INSTANCE : codec;
     this.alignBuffers = alignBuffers;
   }
 
@@ -83,8 +83,10 @@ public class VectorUnloader {
     for (FieldVector vector : root.getFieldVectors()) {
       appendNodes(vector, nodes, buffers);
     }
+    // Do NOT retain buffers in ArrowRecordBatch constructor since we have already retained them.
     return new ArrowRecordBatch(
-        root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers);
+        root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers,
+        /*retainBuffers*/ false);
   }
 
   private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -97,6 +99,11 @@ public class VectorUnloader {
           vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
     }
     for (ArrowBuf buf : fieldBuffers) {
+      // If the codec is NoCompressionCodec, then it will return the input buffer unchanged. In that case,
+      // we need to retain it for ArrowRecordBatch. Otherwise, it will return a new buffer, and also close
+      // the input buffer. In that case, we need to retain the input buffer still to avoid modifying
+      // the source VectorSchemaRoot.
+      buf.getReferenceManager().retain();
       buffers.add(codec.compress(vector.getAllocator(), buf));
     }
     for (FieldVector child : vector.getChildrenFromFields()) {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
index 39b32968d5..f45e51b8d4 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
@@ -46,6 +46,7 @@ public abstract class AbstractCompressionCodec implements CompressionCodec {
     if (compressedLength > uncompressedLength) {
       // compressed buffer is larger, send the raw buffer
       compressedBuffer.close();
+      // XXX: this makes a copy of uncompressedBuffer
       compressedBuffer = CompressionUtil.packageRawBuffer(allocator, uncompressedBuffer);
     } else {
       writeUncompressedLength(compressedBuffer, uncompressedLength);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 55cd262858..4b41d0ab61 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -25,6 +25,8 @@ import java.util.Map;
 
 import org.apache.arrow.util.VisibleForTesting;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.ArrowBlock;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
@@ -69,6 +71,13 @@ public class ArrowFileWriter extends ArrowWriter {
     this.metaData = metaData;
   }
 
+  public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+                         Map<String, String> metaData, IpcOption option, CompressionCodec.Factory compressionFactory,
+                         CompressionUtil.CodecType codecType) {
+    super(root, provider, out, option, compressionFactory, codecType);
+    this.metaData = metaData;
+  }
+
   @Override
   protected void startInternal(WriteChannel out) throws IOException {
     ArrowMagic.writeMagic(out, true);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index deb98580fe..60230d5a9b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -23,6 +23,8 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.IpcOption;
 import org.apache.arrow.vector.ipc.message.MessageSerializer;
@@ -65,6 +67,23 @@ public class ArrowStreamWriter extends ArrowWriter {
     super(root, provider, out, option);
   }
 
+  /**
+   * Construct an ArrowStreamWriter with compression enabled.
+   *
+   * @param root Existing VectorSchemaRoot with vectors to be written.
+   * @param provider DictionaryProvider for any vectors that are dictionary encoded.
+   *                 (Optional, can be null)
+   * @param option IPC write options
+   * @param compressionFactory Compression codec factory
+   * @param codecType Codec type
+   * @param out WritableByteChannel for writing.
+   */
+  public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+                           IpcOption option, CompressionCodec.Factory compressionFactory,
+                           CompressionUtil.CodecType codecType) {
+    super(root, provider, out, option, compressionFactory, codecType);
+  }
+
   /**
    * Write an EOS identifier to the WriteChannel.
    *
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 7bc9a306ff..13e313ab4d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -29,6 +29,9 @@ import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.ArrowBlock;
@@ -65,19 +68,27 @@ public abstract class ArrowWriter implements AutoCloseable {
   protected IpcOption option;
 
   protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
-    this (root, provider, out, IpcOption.DEFAULT);
+    this(root, provider, out, IpcOption.DEFAULT);
+  }
+
+  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
+    this(root, provider, out, option, NoCompressionCodec.Factory.INSTANCE, CompressionUtil.CodecType.NO_COMPRESSION);
   }
 
   /**
    * Note: fields are not closed when the writer is closed.
    *
-   * @param root     the vectors to write to the output
-   * @param provider where to find the dictionaries
-   * @param out      the output where to write
-   * @param option   IPC write options
+   * @param root               the vectors to write to the output
+   * @param provider           where to find the dictionaries
+   * @param out                the output where to write
+   * @param option             IPC write options
+   * @param compressionFactory Compression codec factory
+   * @param codecType          Compression codec
    */
-  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
-    this.unloader = new VectorUnloader(root);
+  protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option,
+                        CompressionCodec.Factory compressionFactory, CompressionUtil.CodecType codecType) {
+    this.unloader = new VectorUnloader(
+        root, /*includeNullCount*/ true, compressionFactory.createCodec(codecType), /*alignBuffers*/ true);
     this.out = new WriteChannel(out);
     this.option = option;
 
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 307d32cb74..83a8ece0bf 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -76,10 +76,28 @@ public class ArrowRecordBatch implements ArrowMessage {
    * @param nodes   field level info
    * @param buffers will be retained until this recordBatch is closed
    * @param bodyCompression compression info.
+   * @param alignBuffers Whether to align buffers to an 8 byte boundary.
    */
   public ArrowRecordBatch(
       int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
       ArrowBodyCompression bodyCompression, boolean alignBuffers) {
+    this(length, nodes, buffers, bodyCompression, alignBuffers, /*retainBuffers*/ true);
+  }
+
+  /**
+   * Construct a record batch from nodes.
+   *
+   * @param length  how many rows in this batch
+   * @param nodes   field level info
+   * @param buffers will be retained until this recordBatch is closed
+   * @param bodyCompression compression info.
+   * @param alignBuffers Whether to align buffers to an 8 byte boundary.
+   * @param retainBuffers Whether to retain() each source buffer in the constructor. If false, the caller is
+   *                      responsible for retaining the buffers beforehand.
+   */
+  public ArrowRecordBatch(
+      int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
+      ArrowBodyCompression bodyCompression, boolean alignBuffers, boolean retainBuffers) {
     super();
     this.length = length;
     this.nodes = nodes;
@@ -89,7 +107,9 @@ public class ArrowRecordBatch implements ArrowMessage {
     List<ArrowBuffer> arrowBuffers = new ArrayList<>(buffers.size());
     long offset = 0;
     for (ArrowBuf arrowBuf : buffers) {
-      arrowBuf.getReferenceManager().retain();
+      if (retainBuffers) {
+        arrowBuf.getReferenceManager().retain();
+      }
       long size = arrowBuf.readableBytes();
       arrowBuffers.add(new ArrowBuffer(offset, size));
       if (LOGGER.isDebugEnabled()) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 8e1941a8c9..eac72f4b2c 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -63,6 +63,17 @@ public class TestVectorUnloadLoad {
     allocator.close();
   }
 
+  @Test
+  public void testNullCodec() {
+    final Schema schema = new Schema(Collections.emptyList());
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      root.setRowCount(1);
+      final VectorUnloader unloader = new VectorUnloader(
+          root, /*includeNulls*/ true, /*codec*/ null, /*alignBuffers*/ true);
+      unloader.getRecordBatch().close();
+    }
+  }
+
   @Test
   public void testUnloadLoad() throws IOException {
     int count = 10000;