You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by si...@apache.org on 2020/05/26 02:44:33 UTC
[arrow] branch master updated: ARROW-8171: [Java] Consider
pre-allocating memory for fix-width vector in Avro adapter iterator (#7211)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 01d3b75 ARROW-8171: [Java] Consider pre-allocating memory for fix-width vector in Avro adapter iterator (#7211)
01d3b75 is described below
commit 01d3b75b02715ca26dc27c9a50da04606f4f1363
Author: tianchen92 <ni...@alibaba-inc.com>
AuthorDate: Tue May 26 10:44:01 2020 +0800
ARROW-8171: [Java] Consider pre-allocating memory for fix-width vector in Avro adapter iterator (#7211)
---
.../java/org/apache/arrow/AvroToArrowUtils.java | 2 ++
.../apache/arrow/AvroToArrowVectorIterator.java | 19 +++++++++++++++
.../apache/arrow/consumers/AvroArraysConsumer.java | 7 ++++++
.../arrow/consumers/AvroBooleanConsumer.java | 2 +-
.../apache/arrow/consumers/AvroDoubleConsumer.java | 2 +-
.../apache/arrow/consumers/AvroFloatConsumer.java | 2 +-
.../apache/arrow/consumers/AvroIntConsumer.java | 2 +-
.../apache/arrow/consumers/AvroLongConsumer.java | 2 +-
.../apache/arrow/consumers/AvroMapConsumer.java | 12 ++++++++++
.../apache/arrow/consumers/AvroStructConsumer.java | 10 ++++++++
.../apache/arrow/consumers/AvroUnionsConsumer.java | 12 ++++++++++
.../arrow/consumers/logical/AvroDateConsumer.java | 2 +-
.../consumers/logical/AvroDecimalConsumer.java | 4 ++--
.../consumers/logical/AvroTimeMicroConsumer.java | 2 +-
.../consumers/logical/AvroTimeMillisConsumer.java | 2 +-
.../logical/AvroTimestampMicrosConsumer.java | 2 +-
.../logical/AvroTimestampMillisConsumer.java | 2 +-
.../arrow/adapter/jdbc/ArrowVectorIterator.java | 26 +++------------------
.../org/apache/arrow/adapter/jdbc/JdbcToArrow.java | 3 ++-
.../arrow/adapter/jdbc/JdbcToArrowUtils.java | 3 ++-
.../arrow/vector/util/ValueVectorUtility.java | 27 ++++++++++++++++++++++
21 files changed, 108 insertions(+), 37 deletions(-)
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 1e54f1f..29e44da 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -91,6 +91,7 @@ import org.apache.arrow.vector.types.pojo.DictionaryEncoding;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.util.JsonStringArrayList;
+import org.apache.arrow.vector.util.ValueVectorUtility;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -737,6 +738,7 @@ public class AvroToArrowUtils {
int valueCount = 0;
try {
while (true) {
+ ValueVectorUtility.ensureCapacity(root, valueCount + 1);
compositeConsumer.consume(decoder);
valueCount++;
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
index 6627528..1faa759 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowVectorIterator.java
@@ -28,6 +28,7 @@ import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.util.ValueVectorUtility;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
@@ -104,6 +105,20 @@ public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, Au
compositeConsumer.consume(decoder);
readRowCount++;
}
+
+ if (targetBatchSize == NO_LIMIT_BATCH_SIZE) {
+ while (true) {
+ ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
+ compositeConsumer.consume(decoder);
+ readRowCount++;
+ }
+ } else {
+ while (readRowCount < targetBatchSize) {
+ compositeConsumer.consume(decoder);
+ readRowCount++;
+ }
+ }
+
root.setRowCount(readRowCount);
} catch (EOFException eof) {
// reach the end of encoder stream.
@@ -116,6 +131,10 @@ public class AvroToArrowVectorIterator implements Iterator<VectorSchemaRoot>, Au
// Loads the next schema root or null if no more rows are available.
private void load(VectorSchemaRoot root) {
+ final int targetBatchSize = config.getTargetBatchSize();
+ if (targetBatchSize != NO_LIMIT_BATCH_SIZE) {
+ ValueVectorUtility.preAllocate(root, targetBatchSize);
+ }
long validConsumerCount = compositeConsumer.getConsumers().stream().filter(c ->
!c.skippable()).count();
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
index facd16c..b9d0f84 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
@@ -45,6 +45,7 @@ public class AvroArraysConsumer extends BaseAvroConsumer<ListVector> {
long totalCount = 0;
for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) {
totalCount += count;
+ ensureInnerVectorCapacity(totalCount);
for (int element = 0; element < count; element++) {
delegate.consume(decoder);
}
@@ -64,4 +65,10 @@ public class AvroArraysConsumer extends BaseAvroConsumer<ListVector> {
this.delegate.resetValueVector(vector.getDataVector());
return super.resetValueVector(vector);
}
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ while (vector.getDataVector().getValueCapacity() < targetCapacity) {
+ vector.getDataVector().reAlloc();
+ }
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index 25e742c..4ca5f24 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -37,7 +37,7 @@ public class AvroBooleanConsumer extends BaseAvroConsumer<BitVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex, decoder.readBoolean() ? 1 : 0);
+ vector.set(currentIndex, decoder.readBoolean() ? 1 : 0);
currentIndex++;
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 2fa7110..356707a 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -37,6 +37,6 @@ public class AvroDoubleConsumer extends BaseAvroConsumer<Float8Vector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readDouble());
+ vector.set(currentIndex++, decoder.readDouble());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index e811f84..c8de4a2 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -37,6 +37,6 @@ public class AvroFloatConsumer extends BaseAvroConsumer<Float4Vector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readFloat());
+ vector.set(currentIndex++, decoder.readFloat());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index 0c9df63..bc8d4de 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -37,6 +37,6 @@ public class AvroIntConsumer extends BaseAvroConsumer<IntVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readInt());
+ vector.set(currentIndex++, decoder.readInt());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index c1e48bf..b9016c5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -37,6 +37,6 @@ public class AvroLongConsumer extends BaseAvroConsumer<BigIntVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readLong());
+ vector.set(currentIndex++, decoder.readLong());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
index 6f0931d..b8e8bd5 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
@@ -19,7 +19,9 @@ package org.apache.arrow.consumers;
import java.io.IOException;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
import org.apache.avro.io.Decoder;
/**
@@ -45,6 +47,7 @@ public class AvroMapConsumer extends BaseAvroConsumer<MapVector> {
long totalCount = 0;
for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) {
totalCount += count;
+ ensureInnerVectorCapacity(totalCount);
for (int element = 0; element < count; element++) {
delegate.consume(decoder);
}
@@ -64,4 +67,13 @@ public class AvroMapConsumer extends BaseAvroConsumer<MapVector> {
this.delegate.resetValueVector(vector.getDataVector());
return super.resetValueVector(vector);
}
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ StructVector innerVector = (StructVector) vector.getDataVector();
+ for (FieldVector v : innerVector.getChildrenFromFields()) {
+ while (v.getValueCapacity() < targetCapacity) {
+ v.reAlloc();
+ }
+ }
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
index c2b4ecb..792d01e 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.avro.io.Decoder;
@@ -42,6 +43,7 @@ public class AvroStructConsumer extends BaseAvroConsumer<StructVector> {
@Override
public void consume(Decoder decoder) throws IOException {
+ ensureInnerVectorCapacity(currentIndex + 1);
for (int i = 0; i < delegates.length; i++) {
delegates[i].consume(decoder);
}
@@ -63,4 +65,12 @@ public class AvroStructConsumer extends BaseAvroConsumer<StructVector> {
}
return super.resetValueVector(vector);
}
+
+ void ensureInnerVectorCapacity(long targetCapacity) {
+ for (FieldVector v : vector.getChildrenFromFields()) {
+ while (v.getValueCapacity() < targetCapacity) {
+ v.reAlloc();
+ }
+ }
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
index 702790e..c0bb020 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -20,6 +20,7 @@ package org.apache.arrow.consumers;
import java.io.IOException;
import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.types.Types;
import org.apache.avro.io.Decoder;
@@ -47,6 +48,7 @@ public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> {
public void consume(Decoder decoder) throws IOException {
int fieldIndex = decoder.readInt();
+ ensureInnerVectorCapacity(currentIndex + 1, fieldIndex);
Consumer delegate = delegates[fieldIndex];
vector.setType(currentIndex, types[fieldIndex]);
@@ -71,4 +73,14 @@ public class AvroUnionsConsumer extends BaseAvroConsumer<UnionVector> {
}
return super.resetValueVector(vector);
}
+
+ void ensureInnerVectorCapacity(long targetCapacity, int fieldIndex) {
+ ValueVector fieldVector = vector.getChildrenFromFields().get(fieldIndex);
+ if (fieldVector.getMinorType() == Types.MinorType.NULL) {
+ return;
+ }
+ while (fieldVector.getValueCapacity() < targetCapacity) {
+ fieldVector.reAlloc();
+ }
+ }
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java
index e5780e2..3aa8970 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDateConsumer.java
@@ -38,6 +38,6 @@ public class AvroDateConsumer extends BaseAvroConsumer<DateDayVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readInt());
+ vector.set(currentIndex++, decoder.readInt());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java
index 94f7413..24d73cf 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroDecimalConsumer.java
@@ -58,7 +58,7 @@ public abstract class AvroDecimalConsumer extends BaseAvroConsumer<DecimalVector
byte[] bytes = new byte[cacheBuffer.limit()];
Preconditions.checkArgument(bytes.length <= 16, "Decimal bytes length should <= 16.");
cacheBuffer.get(bytes);
- vector.setBigEndianSafe(currentIndex++, bytes);
+ vector.setBigEndian(currentIndex++, bytes);
}
}
@@ -82,7 +82,7 @@ public abstract class AvroDecimalConsumer extends BaseAvroConsumer<DecimalVector
@Override
public void consume(Decoder decoder) throws IOException {
decoder.readFixed(reuseBytes);
- vector.setBigEndianSafe(currentIndex++, reuseBytes);
+ vector.setBigEndian(currentIndex++, reuseBytes);
}
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java
index a52be4a..e68ba15 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMicroConsumer.java
@@ -38,6 +38,6 @@ public class AvroTimeMicroConsumer extends BaseAvroConsumer<TimeMicroVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readLong());
+ vector.set(currentIndex++, decoder.readLong());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java
index e9faf9b..f76186f 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimeMillisConsumer.java
@@ -38,6 +38,6 @@ public class AvroTimeMillisConsumer extends BaseAvroConsumer<TimeMilliVector> {
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readInt());
+ vector.set(currentIndex++, decoder.readInt());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java
index 94eecef..82da0e8 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMicrosConsumer.java
@@ -38,6 +38,6 @@ public class AvroTimestampMicrosConsumer extends BaseAvroConsumer<TimeStampMicro
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readLong());
+ vector.set(currentIndex++, decoder.readLong());
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java
index 2d176d7..159f49e 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/logical/AvroTimestampMillisConsumer.java
@@ -38,6 +38,6 @@ public class AvroTimestampMillisConsumer extends BaseAvroConsumer<TimeStampMilli
@Override
public void consume(Decoder decoder) throws IOException {
- vector.setSafe(currentIndex++, decoder.readLong());
+ vector.set(currentIndex++, decoder.readLong());
}
}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/ArrowVectorIterator.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/ArrowVectorIterator.java
index 973bdf1..e7842ff 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/ArrowVectorIterator.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/ArrowVectorIterator.java
@@ -25,10 +25,9 @@ import java.util.Iterator;
import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.BaseFixedWidthVector;
-import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.ValueVectorUtility;
/**
* VectorSchemaRoot iterator for partially converting JDBC data.
@@ -96,7 +95,7 @@ public class ArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoClos
int readRowCount = 0;
if (targetBatchSize == JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
while (resultSet.next()) {
- ensureCapacity(root, readRowCount + 1);
+ ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
compositeConsumer.consume(resultSet);
readRowCount++;
}
@@ -120,7 +119,7 @@ public class ArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoClos
try {
root = VectorSchemaRoot.create(schema, config.getAllocator());
if (config.getTargetBatchSize() != JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
- preAllocate(root, config);
+ ValueVectorUtility.preAllocate(root, config.getTargetBatchSize());
}
} catch (Exception e) {
if (root != null) {
@@ -131,25 +130,6 @@ public class ArrowVectorIterator implements Iterator<VectorSchemaRoot>, AutoClos
return root;
}
- static void preAllocate(VectorSchemaRoot root, JdbcToArrowConfig config) {
- int targetSize = config.getTargetBatchSize();
- for (ValueVector vector : root.getFieldVectors()) {
- if (vector instanceof BaseFixedWidthVector) {
- ((BaseFixedWidthVector) vector).allocateNew(targetSize);
- }
- }
- }
-
- static void ensureCapacity(VectorSchemaRoot root, int targetCapacity) {
- for (ValueVector vector : root.getFieldVectors()) {
- if (vector instanceof BaseFixedWidthVector) {
- while (vector.getValueCapacity() < targetCapacity) {
- vector.reAlloc();
- }
- }
- }
- }
-
// Loads the next schema root or null if no more rows are available.
private void load(VectorSchemaRoot root) throws SQLException {
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
index 80fc81e..285a860 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrow.java
@@ -28,6 +28,7 @@ import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.util.ValueVectorUtility;
/**
* Utility class to convert JDBC objects to columnar Arrow format objects.
@@ -221,7 +222,7 @@ public class JdbcToArrow {
VectorSchemaRoot root = VectorSchemaRoot.create(
JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config), config.getAllocator());
if (config.getTargetBatchSize() != JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
- ArrowVectorIterator.preAllocate(root, config);
+ ValueVectorUtility.preAllocate(root, config.getTargetBatchSize());
}
JdbcToArrowUtils.jdbcToArrowVectors(resultSet, root, config);
return root;
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
index df523f9..e534d20 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
@@ -80,6 +80,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.ValueVectorUtility;
/**
* Class that does most of the work to convert JDBC ResultSet data into Arrow columnar format Vector objects.
@@ -357,7 +358,7 @@ public class JdbcToArrowUtils {
int readRowCount = 0;
if (config.getTargetBatchSize() == JdbcToArrowConfig.NO_LIMIT_BATCH_SIZE) {
while (rs.next()) {
- ArrowVectorIterator.ensureCapacity(root, readRowCount + 1);
+ ValueVectorUtility.ensureCapacity(root, readRowCount + 1);
compositeConsumer.consume(rs);
readRowCount++;
}
diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/ValueVectorUtility.java b/java/vector/src/main/java/org/apache/arrow/vector/util/ValueVectorUtility.java
index 7c2de7f..d9c1173 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/util/ValueVectorUtility.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/util/ValueVectorUtility.java
@@ -19,10 +19,12 @@ package org.apache.arrow.vector.util;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BufferLayout;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.TypeLayout;
import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.validate.ValidateVectorVisitor;
@@ -128,4 +130,29 @@ public class ValueVectorUtility {
vector.accept(visitor, null);
}
+
+ /**
+ * Pre allocate memory for BaseFixedWidthVector.
+ */
+ public static void preAllocate(VectorSchemaRoot root, int targetSize) {
+ for (ValueVector vector : root.getFieldVectors()) {
+ if (vector instanceof BaseFixedWidthVector) {
+ ((BaseFixedWidthVector) vector).allocateNew(targetSize);
+ }
+ }
+ }
+
+ /**
+ * Ensure capacity for BaseFixedWidthVector.
+ */
+ public static void ensureCapacity(VectorSchemaRoot root, int targetCapacity) {
+ for (ValueVector vector : root.getFieldVectors()) {
+ if (vector instanceof BaseFixedWidthVector) {
+ while (vector.getValueCapacity() < targetCapacity) {
+ vector.reAlloc();
+ }
+ }
+ }
+ }
+
}