You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/01/19 13:45:25 UTC
[arrow] branch master updated: GH-15203: [Java] Implement writing compressed files (#15223)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4c698fb3c2 GH-15203: [Java] Implement writing compressed files (#15223)
4c698fb3c2 is described below
commit 4c698fb3c2a2b4ee046c6ad6e992e81ed90c7b0e
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jan 19 08:45:16 2023 -0500
GH-15203: [Java] Implement writing compressed files (#15223)
* Closes: #15203
Authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
.../arrow/compression/TestCompressionCodec.java | 209 ++++++++++++++++-----
.../org/apache/arrow/vector/VectorUnloader.java | 11 +-
.../compression/AbstractCompressionCodec.java | 1 +
.../apache/arrow/vector/ipc/ArrowFileWriter.java | 9 +
.../apache/arrow/vector/ipc/ArrowStreamWriter.java | 19 ++
.../org/apache/arrow/vector/ipc/ArrowWriter.java | 25 ++-
.../arrow/vector/ipc/message/ArrowRecordBatch.java | 22 ++-
.../apache/arrow/vector/TestVectorUnloadLoad.java | 11 ++
8 files changed, 255 insertions(+), 52 deletions(-)
diff --git a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
index 1f6d64d476..a1d5000daa 100644
--- a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
+++ b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodec.java
@@ -19,11 +19,21 @@ package org.apache.arrow.compression;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
@@ -32,63 +42,62 @@ import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.compression.CompressionCodec;
import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+import org.apache.arrow.vector.ipc.ArrowFileReader;
+import org.apache.arrow.vector.ipc.ArrowFileWriter;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Test cases for {@link CompressionCodec}s.
*/
-@RunWith(Parameterized.class)
-public class TestCompressionCodec {
-
- private final CompressionCodec codec;
-
+class TestCompressionCodec {
private BufferAllocator allocator;
- private final int vectorLength;
-
- @Before
- public void init() {
+ @BeforeEach
+ void init() {
allocator = new RootAllocator(Integer.MAX_VALUE);
}
- @After
- public void terminate() {
+ @AfterEach
+ void terminate() {
allocator.close();
}
- public TestCompressionCodec(CompressionUtil.CodecType type, int vectorLength, CompressionCodec codec) {
- this.codec = codec;
- this.vectorLength = vectorLength;
- }
-
- @Parameterized.Parameters(name = "codec = {0}, length = {1}")
- public static Collection<Object[]> getCodecs() {
- List<Object[]> params = new ArrayList<>();
+ static Collection<Arguments> codecs() {
+ List<Arguments> params = new ArrayList<>();
int[] lengths = new int[] {10, 100, 1000};
for (int len : lengths) {
CompressionCodec dumbCodec = NoCompressionCodec.INSTANCE;
- params.add(new Object[]{dumbCodec.getCodecType(), len, dumbCodec});
+ params.add(Arguments.arguments(len, dumbCodec));
CompressionCodec lz4Codec = new Lz4CompressionCodec();
- params.add(new Object[]{lz4Codec.getCodecType(), len, lz4Codec});
+ params.add(Arguments.arguments(len, lz4Codec));
CompressionCodec zstdCodec = new ZstdCompressionCodec();
- params.add(new Object[]{zstdCodec.getCodecType(), len, zstdCodec});
-
+ params.add(Arguments.arguments(len, zstdCodec));
}
return params;
}
- private List<ArrowBuf> compressBuffers(List<ArrowBuf> inputBuffers) {
+ private List<ArrowBuf> compressBuffers(CompressionCodec codec, List<ArrowBuf> inputBuffers) {
List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
for (ArrowBuf buf : inputBuffers) {
outputBuffers.add(codec.compress(allocator, buf));
@@ -96,7 +105,7 @@ public class TestCompressionCodec {
return outputBuffers;
}
- private List<ArrowBuf> deCompressBuffers(List<ArrowBuf> inputBuffers) {
+ private List<ArrowBuf> deCompressBuffers(CompressionCodec codec, List<ArrowBuf> inputBuffers) {
List<ArrowBuf> outputBuffers = new ArrayList<>(inputBuffers.size());
for (ArrowBuf buf : inputBuffers) {
outputBuffers.add(codec.decompress(allocator, buf));
@@ -104,8 +113,9 @@ public class TestCompressionCodec {
return outputBuffers;
}
- @Test
- public void testCompressFixedWidthBuffers() throws Exception {
+ @ParameterizedTest
+ @MethodSource("codecs")
+ void testCompressFixedWidthBuffers(int vectorLength, CompressionCodec codec) throws Exception {
// prepare vector to compress
IntVector origVec = new IntVector("vec", allocator);
origVec.allocateNew(vectorLength);
@@ -121,8 +131,8 @@ public class TestCompressionCodec {
// compress & decompress
List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
- List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
- List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+ List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+ List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
assertEquals(2, decompressedBuffers.size());
@@ -144,8 +154,9 @@ public class TestCompressionCodec {
AutoCloseables.close(decompressedBuffers);
}
- @Test
- public void testCompressVariableWidthBuffers() throws Exception {
+ @ParameterizedTest
+ @MethodSource("codecs")
+ void testCompressVariableWidthBuffers(int vectorLength, CompressionCodec codec) throws Exception {
// prepare vector to compress
VarCharVector origVec = new VarCharVector("vec", allocator);
origVec.allocateNew();
@@ -161,8 +172,8 @@ public class TestCompressionCodec {
// compress & decompress
List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
- List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
- List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+ List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+ List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
assertEquals(3, decompressedBuffers.size());
@@ -184,8 +195,9 @@ public class TestCompressionCodec {
AutoCloseables.close(decompressedBuffers);
}
- @Test
- public void testEmptyBuffer() throws Exception {
+ @ParameterizedTest
+ @MethodSource("codecs")
+ void testEmptyBuffer(int vectorLength, CompressionCodec codec) throws Exception {
final VarBinaryVector origVec = new VarBinaryVector("vec", allocator);
origVec.allocateNew(vectorLength);
@@ -194,8 +206,8 @@ public class TestCompressionCodec {
origVec.setValueCount(vectorLength);
final List<ArrowBuf> origBuffers = origVec.getFieldBuffers();
- final List<ArrowBuf> compressedBuffers = compressBuffers(origBuffers);
- final List<ArrowBuf> decompressedBuffers = deCompressBuffers(compressedBuffers);
+ final List<ArrowBuf> compressedBuffers = compressBuffers(codec, origBuffers);
+ final List<ArrowBuf> decompressedBuffers = deCompressBuffers(codec, compressedBuffers);
// orchestrate new vector
VarBinaryVector newVec = new VarBinaryVector("new vec", allocator);
@@ -210,4 +222,117 @@ public class TestCompressionCodec {
newVec.close();
AutoCloseables.close(decompressedBuffers);
}
+
+ private static Stream<CompressionUtil.CodecType> codecTypes() {
+ return Arrays.stream(CompressionUtil.CodecType.values());
+ }
+
+ @ParameterizedTest
+ @MethodSource("codecTypes")
+ void testReadWriteStream(CompressionUtil.CodecType codec) throws Exception {
+ withRoot(codec, (factory, root) -> {
+ ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+ try (final ArrowStreamWriter writer = new ArrowStreamWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
+ Channels.newChannel(compressedStream),
+ IpcOption.DEFAULT, factory, codec)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ try (ArrowStreamReader reader = new ArrowStreamReader(
+ new ByteArrayReadableSeekableByteChannel(compressedStream.toByteArray()), allocator, factory)) {
+ assertTrue(reader.loadNextBatch());
+ assertTrue(root.equals(reader.getVectorSchemaRoot()));
+ assertFalse(reader.loadNextBatch());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("codecTypes")
+ void testReadWriteFile(CompressionUtil.CodecType codec) throws Exception {
+ withRoot(codec, (factory, root) -> {
+ ByteArrayOutputStream compressedStream = new ByteArrayOutputStream();
+ try (final ArrowFileWriter writer = new ArrowFileWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
+ Channels.newChannel(compressedStream),
+ new HashMap<>(), IpcOption.DEFAULT, factory, codec)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ try (ArrowFileReader reader = new ArrowFileReader(
+ new ByteArrayReadableSeekableByteChannel(compressedStream.toByteArray()), allocator, factory)) {
+ assertTrue(reader.loadNextBatch());
+ assertTrue(root.equals(reader.getVectorSchemaRoot()));
+ assertFalse(reader.loadNextBatch());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /** Unloading a vector should not free source buffers. */
+ @ParameterizedTest
+ @MethodSource("codecTypes")
+ void testUnloadCompressed(CompressionUtil.CodecType codec) {
+ withRoot(codec, (factory, root) -> {
+ root.getFieldVectors().forEach((vector) -> {
+ Arrays.stream(vector.getBuffers(/*clear*/ false)).forEach((buf) -> {
+ assertNotEquals(0, buf.getReferenceManager().getRefCount());
+ });
+ });
+
+ final VectorUnloader unloader = new VectorUnloader(
+ root, /*includeNullCount*/ true, factory.createCodec(codec), /*alignBuffers*/ true);
+ unloader.getRecordBatch().close();
+
+ root.getFieldVectors().forEach((vector) -> {
+ Arrays.stream(vector.getBuffers(/*clear*/ false)).forEach((buf) -> {
+ assertNotEquals(0, buf.getReferenceManager().getRefCount());
+ });
+ });
+ });
+ }
+
+ void withRoot(CompressionUtil.CodecType codec, BiConsumer<CompressionCodec.Factory, VectorSchemaRoot> testBody) {
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("ints", new ArrowType.Int(32, true)),
+ Field.nullable("strings", ArrowType.Utf8.INSTANCE)));
+ CompressionCodec.Factory factory = codec == CompressionUtil.CodecType.NO_COMPRESSION ?
+ NoCompressionCodec.Factory.INSTANCE : CommonsCompressionFactory.INSTANCE;
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final IntVector ints = (IntVector) root.getVector(0);
+ final VarCharVector strings = (VarCharVector) root.getVector(1);
+ // Doesn't get compresed
+ ints.setSafe(0, 0x4a3e);
+ ints.setSafe(1, 0x8aba);
+ ints.setSafe(2, 0x4362);
+ ints.setSafe(3, 0x383f);
+ // Gets compressed
+ String compressibleString = " "; // 16 bytes
+ compressibleString = compressibleString + compressibleString;
+ compressibleString = compressibleString + compressibleString;
+ compressibleString = compressibleString + compressibleString;
+ compressibleString = compressibleString + compressibleString;
+ compressibleString = compressibleString + compressibleString; // 512 bytes
+ byte[] compressibleData = compressibleString.getBytes(StandardCharsets.UTF_8);
+ strings.setSafe(0, compressibleData);
+ strings.setSafe(1, compressibleData);
+ strings.setSafe(2, compressibleData);
+ strings.setSafe(3, compressibleData);
+ root.setRowCount(4);
+
+ testBody.accept(factory, root);
+ }
+ }
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
index e2cbf3ec1d..1d44e37ac7 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java
@@ -69,7 +69,7 @@ public class VectorUnloader {
VectorSchemaRoot root, boolean includeNullCount, CompressionCodec codec, boolean alignBuffers) {
this.root = root;
this.includeNullCount = includeNullCount;
- this.codec = codec;
+ this.codec = codec == null ? NoCompressionCodec.INSTANCE : codec;
this.alignBuffers = alignBuffers;
}
@@ -83,8 +83,10 @@ public class VectorUnloader {
for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
}
+ // Do NOT retain buffers in ArrowRecordBatch constructor since we have already retained them.
return new ArrowRecordBatch(
- root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers);
+ root.getRowCount(), nodes, buffers, CompressionUtil.createBodyCompression(codec), alignBuffers,
+ /*retainBuffers*/ false);
}
private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
@@ -97,6 +99,11 @@ public class VectorUnloader {
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
for (ArrowBuf buf : fieldBuffers) {
+ // If the codec is NoCompressionCodec, then it will return the input buffer unchanged. In that case,
+ // we need to retain it for ArrowRecordBatch. Otherwise, it will return a new buffer, and also close
+ // the input buffer. In that case, we need to retain the input buffer still to avoid modifying
+ // the source VectorSchemaRoot.
+ buf.getReferenceManager().retain();
buffers.add(codec.compress(vector.getAllocator(), buf));
}
for (FieldVector child : vector.getChildrenFromFields()) {
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java b/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
index 39b32968d5..f45e51b8d4 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/compression/AbstractCompressionCodec.java
@@ -46,6 +46,7 @@ public abstract class AbstractCompressionCodec implements CompressionCodec {
if (compressedLength > uncompressedLength) {
// compressed buffer is larger, send the raw buffer
compressedBuffer.close();
+ // XXX: this makes a copy of uncompressedBuffer
compressedBuffer = CompressionUtil.packageRawBuffer(allocator, uncompressedBuffer);
} else {
writeUncompressedLength(compressedBuffer, uncompressedLength);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
index 55cd262858..4b41d0ab61 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileWriter.java
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
@@ -69,6 +71,13 @@ public class ArrowFileWriter extends ArrowWriter {
this.metaData = metaData;
}
+ public ArrowFileWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ Map<String, String> metaData, IpcOption option, CompressionCodec.Factory compressionFactory,
+ CompressionUtil.CodecType codecType) {
+ super(root, provider, out, option, compressionFactory, codecType);
+ this.metaData = metaData;
+ }
+
@Override
protected void startInternal(WriteChannel out) throws IOException {
ArrowMagic.writeMagic(out, true);
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
index deb98580fe..60230d5a9b 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamWriter.java
@@ -23,6 +23,8 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
@@ -65,6 +67,23 @@ public class ArrowStreamWriter extends ArrowWriter {
super(root, provider, out, option);
}
+ /**
+ * Construct an ArrowStreamWriter with compression enabled.
+ *
+ * @param root Existing VectorSchemaRoot with vectors to be written.
+ * @param provider DictionaryProvider for any vectors that are dictionary encoded.
+ * (Optional, can be null)
+ * @param option IPC write options
+ * @param compressionFactory Compression codec factory
+ * @param codecType Codec type
+ * @param out WritableByteChannel for writing.
+ */
+ public ArrowStreamWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out,
+ IpcOption option, CompressionCodec.Factory compressionFactory,
+ CompressionUtil.CodecType codecType) {
+ super(root, provider, out, option, compressionFactory, codecType);
+ }
+
/**
* Write an EOS identifier to the WriteChannel.
*
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
index 7bc9a306ff..13e313ab4d 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java
@@ -29,6 +29,9 @@ import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
@@ -65,19 +68,27 @@ public abstract class ArrowWriter implements AutoCloseable {
protected IpcOption option;
protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out) {
- this (root, provider, out, IpcOption.DEFAULT);
+ this(root, provider, out, IpcOption.DEFAULT);
+ }
+
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
+ this(root, provider, out, option, NoCompressionCodec.Factory.INSTANCE, CompressionUtil.CodecType.NO_COMPRESSION);
}
/**
* Note: fields are not closed when the writer is closed.
*
- * @param root the vectors to write to the output
- * @param provider where to find the dictionaries
- * @param out the output where to write
- * @param option IPC write options
+ * @param root the vectors to write to the output
+ * @param provider where to find the dictionaries
+ * @param out the output where to write
+ * @param option IPC write options
+ * @param compressionFactory Compression codec factory
+ * @param codecType Compression codec
*/
- protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option) {
- this.unloader = new VectorUnloader(root);
+ protected ArrowWriter(VectorSchemaRoot root, DictionaryProvider provider, WritableByteChannel out, IpcOption option,
+ CompressionCodec.Factory compressionFactory, CompressionUtil.CodecType codecType) {
+ this.unloader = new VectorUnloader(
+ root, /*includeNullCount*/ true, compressionFactory.createCodec(codecType), /*alignBuffers*/ true);
this.out = new WriteChannel(out);
this.option = option;
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
index 307d32cb74..83a8ece0bf 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java
@@ -76,10 +76,28 @@ public class ArrowRecordBatch implements ArrowMessage {
* @param nodes field level info
* @param buffers will be retained until this recordBatch is closed
* @param bodyCompression compression info.
+ * @param alignBuffers Whether to align buffers to an 8 byte boundary.
*/
public ArrowRecordBatch(
int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
ArrowBodyCompression bodyCompression, boolean alignBuffers) {
+ this(length, nodes, buffers, bodyCompression, alignBuffers, /*retainBuffers*/ true);
+ }
+
+ /**
+ * Construct a record batch from nodes.
+ *
+ * @param length how many rows in this batch
+ * @param nodes field level info
+ * @param buffers will be retained until this recordBatch is closed
+ * @param bodyCompression compression info.
+ * @param alignBuffers Whether to align buffers to an 8 byte boundary.
+ * @param retainBuffers Whether to retain() each source buffer in the constructor. If false, the caller is
+ * responsible for retaining the buffers beforehand.
+ */
+ public ArrowRecordBatch(
+ int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers,
+ ArrowBodyCompression bodyCompression, boolean alignBuffers, boolean retainBuffers) {
super();
this.length = length;
this.nodes = nodes;
@@ -89,7 +107,9 @@ public class ArrowRecordBatch implements ArrowMessage {
List<ArrowBuffer> arrowBuffers = new ArrayList<>(buffers.size());
long offset = 0;
for (ArrowBuf arrowBuf : buffers) {
- arrowBuf.getReferenceManager().retain();
+ if (retainBuffers) {
+ arrowBuf.getReferenceManager().retain();
+ }
long size = arrowBuf.readableBytes();
arrowBuffers.add(new ArrowBuffer(offset, size));
if (LOGGER.isDebugEnabled()) {
diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
index 8e1941a8c9..eac72f4b2c 100644
--- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
+++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java
@@ -63,6 +63,17 @@ public class TestVectorUnloadLoad {
allocator.close();
}
+ @Test
+ public void testNullCodec() {
+ final Schema schema = new Schema(Collections.emptyList());
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ root.setRowCount(1);
+ final VectorUnloader unloader = new VectorUnloader(
+ root, /*includeNulls*/ true, /*codec*/ null, /*alignBuffers*/ true);
+ unloader.getRecordBatch().close();
+ }
+ }
+
@Test
public void testUnloadLoad() throws IOException {
int count = 10000;