You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/12 03:08:45 UTC
[arrow] branch master updated: ARROW-6083: [Java] Refactor Jdbc
adapter consume logic
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5daba72 ARROW-6083: [Java] Refactor Jdbc adapter consume logic
5daba72 is described below
commit 5daba72aef8810940b5bfadb807851f87c219020
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Sun Aug 11 20:06:13 2019 -0700
ARROW-6083: [Java] Refactor Jdbc adapter consume logic
Related to [ARROW-6083](https://issues.apache.org/jira/browse/ARROW-6083).
Jdbc adapter read from ResultSet looks like:
while (rs.next()) {
for (int i = 1; i <= columnCount; i++)
{ jdbcToFieldVector( rs, i, rs.getMetaData().getColumnType(i), rowCount, root.getVector(rsmd.getColumnName(i)), config); }
rowCount++;
}
And in jdbcToFieldVector has lots of switch-case, that is to see, for every single value from ResultSet we have to do lots of analyzing conditions.
I think we could optimize this using consumer/delegate like avro adapter.
Closes #4978 from tianchen92/ARROW-6083 and squashes the following commits:
6acff423d <tianchen> fix potential leak
85e0ebd82 <tianchen> resolve comments
1318574e7 <tianchen> remove addNull API
fa13e5971 <tianchen> add constructor with no Calendar
2b0aa78d6 <tianchen> resolve comments
aa5c9d927 <tianchen> ARROW-6083: Refactor Jdbc adapter consume logic
Authored-by: tianchen <ni...@alibaba-inc.com>
Signed-off-by: Micah Kornfield <em...@gmail.com>
---
.../arrow/adapter/jdbc/JdbcToArrowUtils.java | 416 +++------------------
.../arrow/adapter/jdbc/consumer/ArrayConsumer.java | 68 ++++
.../adapter/jdbc/consumer/BigIntConsumer.java | 52 +++
.../adapter/jdbc/consumer/BinaryConsumer.java | 99 +++++
.../arrow/adapter/jdbc/consumer/BitConsumer.java | 52 +++
.../arrow/adapter/jdbc/consumer/BlobConsumer.java | 58 +++
.../arrow/adapter/jdbc/consumer/ClobConsumer.java | 90 +++++
.../jdbc/consumer/CompositeJdbcConsumer.java | 58 +++
.../arrow/adapter/jdbc/consumer/DateConsumer.java | 64 ++++
.../adapter/jdbc/consumer/DecimalConsumer.java | 53 +++
.../adapter/jdbc/consumer/DoubleConsumer.java | 52 +++
.../arrow/adapter/jdbc/consumer/FloatConsumer.java | 52 +++
.../arrow/adapter/jdbc/consumer/IntConsumer.java | 52 +++
.../arrow/adapter/jdbc/consumer/JdbcConsumer.java | 39 ++
.../adapter/jdbc/consumer/SmallIntConsumer.java | 52 +++
.../arrow/adapter/jdbc/consumer/TimeConsumer.java | 64 ++++
.../adapter/jdbc/consumer/TimestampConsumer.java | 64 ++++
.../adapter/jdbc/consumer/TinyIntConsumer.java | 52 +++
.../adapter/jdbc/consumer/VarCharConsumer.java | 76 ++++
19 files changed, 1151 insertions(+), 362 deletions(-)
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
index 4d7f6ea..3ccdfad 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
@@ -20,14 +20,7 @@ package org.apache.arrow.adapter.jdbc;
import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -43,6 +36,24 @@ import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.arrow.adapter.jdbc.consumer.ArrayConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BigIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BinaryConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BitConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BlobConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.ClobConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DateConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DecimalConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DoubleConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.FloatConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.IntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.SmallIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TimeConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TimestampConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TinyIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.VarCharConsumer;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BaseFixedWidthVector;
@@ -56,34 +67,18 @@ import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.holders.NullableBigIntHolder;
-import org.apache.arrow.vector.holders.NullableBitHolder;
-import org.apache.arrow.vector.holders.NullableDateMilliHolder;
-import org.apache.arrow.vector.holders.NullableDecimalHolder;
-import org.apache.arrow.vector.holders.NullableFloat4Holder;
-import org.apache.arrow.vector.holders.NullableFloat8Holder;
-import org.apache.arrow.vector.holders.NullableIntHolder;
-import org.apache.arrow.vector.holders.NullableSmallIntHolder;
-import org.apache.arrow.vector.holders.NullableTimeMilliHolder;
-import org.apache.arrow.vector.holders.NullableTinyIntHolder;
-import org.apache.arrow.vector.holders.NullableVarCharHolder;
-import org.apache.arrow.vector.holders.VarBinaryHolder;
-import org.apache.arrow.vector.holders.VarCharHolder;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.arrow.vector.util.DecimalUtility;
-
-import io.netty.buffer.ArrowBuf;
/**
* Class that does most of the work to convert JDBC ResultSet data into Arrow columnar format Vector objects.
@@ -93,8 +88,6 @@ import io.netty.buffer.ArrowBuf;
public class JdbcToArrowUtils {
private static final int DEFAULT_BUFFER_SIZE = 256;
- private static final int DEFAULT_STREAM_BUFFER_SIZE = 1024;
- private static final int DEFAULT_CLOB_SUBSTRING_READ_SIZE = 256;
private static final int JDBC_ARRAY_VALUE_COLUMN = 2;
@@ -357,379 +350,78 @@ public class JdbcToArrowUtils {
public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, JdbcToArrowConfig config)
throws SQLException, IOException {
- Preconditions.checkNotNull(rs, "JDBC ResultSet object can't be null");
- Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
- Preconditions.checkNotNull(config, "JDBC-to-Arrow configuration cannot be null");
-
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();
-
allocateVectors(root, DEFAULT_BUFFER_SIZE);
- int rowCount = 0;
- while (rs.next()) {
- for (int i = 1; i <= columnCount; i++) {
- jdbcToFieldVector(
- rs,
- i,
- rs.getMetaData().getColumnType(i),
- rowCount,
- root.getVector(rsmd.getColumnName(i)),
- config);
- }
- rowCount++;
+ JdbcConsumer[] consumers = new JdbcConsumer[columnCount];
+ for (int i = 1; i <= columnCount; i++) {
+ consumers[i - 1] = getConsumer(rs, i, rs.getMetaData().getColumnType(i),
+ root.getVector(rsmd.getColumnName(i)), config);
}
- root.setRowCount(rowCount);
- }
- private static void jdbcToFieldVector(
- ResultSet rs,
- int columnIndex,
- int jdbcColType,
- int rowCount,
- FieldVector vector,
- JdbcToArrowConfig config)
- throws SQLException, IOException {
+ try (CompositeJdbcConsumer compositeConsumer = new CompositeJdbcConsumer(consumers)) {
+ compositeConsumer.consume(rs);
+ root.setRowCount(compositeConsumer.getReadRowCount());
+ }
+ }
+ private static JdbcConsumer getConsumer(ResultSet resultSet, int columnIndex, int jdbcColType,
+ FieldVector vector, JdbcToArrowConfig config) throws SQLException {
final Calendar calendar = config.getCalendar();
-
switch (jdbcColType) {
case Types.BOOLEAN:
case Types.BIT:
- updateVector((BitVector) vector,
- rs.getBoolean(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new BitConsumer((BitVector) vector, columnIndex);
case Types.TINYINT:
- updateVector((TinyIntVector) vector,
- rs.getInt(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new TinyIntConsumer((TinyIntVector) vector, columnIndex);
case Types.SMALLINT:
- updateVector((SmallIntVector) vector,
- rs.getInt(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new SmallIntConsumer((SmallIntVector) vector, columnIndex);
case Types.INTEGER:
- updateVector((IntVector) vector,
- rs.getInt(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new IntConsumer((IntVector) vector, columnIndex);
case Types.BIGINT:
- updateVector((BigIntVector) vector,
- rs.getLong(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new BigIntConsumer((BigIntVector) vector, columnIndex);
case Types.NUMERIC:
case Types.DECIMAL:
- updateVector((DecimalVector) vector,
- rs.getBigDecimal(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new DecimalConsumer((DecimalVector) vector, columnIndex);
case Types.REAL:
case Types.FLOAT:
- updateVector((Float4Vector) vector,
- rs.getFloat(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new FloatConsumer((Float4Vector) vector, columnIndex);
case Types.DOUBLE:
- updateVector((Float8Vector) vector,
- rs.getDouble(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new DoubleConsumer((Float8Vector) vector, columnIndex);
case Types.CHAR:
case Types.NCHAR:
case Types.VARCHAR:
case Types.NVARCHAR:
case Types.LONGVARCHAR:
case Types.LONGNVARCHAR:
- updateVector((VarCharVector) vector,
- rs.getString(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new VarCharConsumer((VarCharVector) vector, columnIndex);
case Types.DATE:
- final Date date;
- if (calendar != null) {
- date = rs.getDate(columnIndex, calendar);
- } else {
- date = rs.getDate(columnIndex);
- }
-
- updateVector((DateMilliVector) vector, date, !rs.wasNull(), rowCount);
- break;
+ return new DateConsumer((DateMilliVector) vector, columnIndex, calendar);
case Types.TIME:
- final Time time;
- if (calendar != null) {
- time = rs.getTime(columnIndex, calendar);
- } else {
- time = rs.getTime(columnIndex);
- }
-
- updateVector((TimeMilliVector) vector, time, !rs.wasNull(), rowCount);
- break;
+ return new TimeConsumer((TimeMilliVector) vector, columnIndex, calendar);
case Types.TIMESTAMP:
- final Timestamp ts;
- if (calendar != null) {
- ts = rs.getTimestamp(columnIndex, calendar);
- } else {
- ts = rs.getTimestamp(columnIndex);
- }
-
- // TODO: Need to handle precision such as milli, micro, nano
- updateVector((TimeStampVector) vector, ts, !rs.wasNull(), rowCount);
- break;
+ return new TimestampConsumer((TimeStampMilliTZVector) vector, columnIndex, calendar);
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
- updateVector((VarBinaryVector) vector,
- rs.getBinaryStream(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new BinaryConsumer((VarBinaryVector) vector, columnIndex);
case Types.ARRAY:
- updateVector((ListVector) vector, rs, columnIndex, rowCount, config);
- break;
+ final JdbcFieldInfo fieldInfo = getJdbcFieldInfoForArraySubType(resultSet.getMetaData(), columnIndex, config);
+ if (fieldInfo == null) {
+ throw new IllegalArgumentException("Column " + columnIndex + " is an array of unknown type.");
+ }
+ JdbcConsumer delegate = getConsumer(resultSet, JDBC_ARRAY_VALUE_COLUMN,
+ fieldInfo.getJdbcType(), ((ListVector)vector).getDataVector(), config);
+ return new ArrayConsumer((ListVector) vector, delegate, columnIndex);
case Types.CLOB:
- updateVector((VarCharVector) vector,
- rs.getClob(columnIndex), !rs.wasNull(), rowCount);
- break;
+ return new ClobConsumer((VarCharVector) vector, columnIndex);
case Types.BLOB:
- updateVector((VarBinaryVector) vector,
- rs.getBlob(columnIndex), !rs.wasNull(), rowCount);
- break;
+ BinaryConsumer delegateConsumer = new BinaryConsumer((VarBinaryVector) vector, columnIndex);
+ return new BlobConsumer(delegateConsumer, columnIndex);
default:
// no-op, shouldn't get here
- break;
- }
- }
-
- private static void updateVector(BitVector bitVector, boolean value, boolean isNonNull, int rowCount) {
- NullableBitHolder holder = new NullableBitHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = value ? 1 : 0;
- }
- bitVector.setSafe(rowCount, holder);
- bitVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(TinyIntVector tinyIntVector, int value, boolean isNonNull, int rowCount) {
- NullableTinyIntHolder holder = new NullableTinyIntHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = (byte) value;
- }
- tinyIntVector.setSafe(rowCount, holder);
- tinyIntVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(SmallIntVector smallIntVector, int value, boolean isNonNull, int rowCount) {
- NullableSmallIntHolder holder = new NullableSmallIntHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = (short) value;
- }
- smallIntVector.setSafe(rowCount, holder);
- smallIntVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(IntVector intVector, int value, boolean isNonNull, int rowCount) {
- NullableIntHolder holder = new NullableIntHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = value;
- }
- intVector.setSafe(rowCount, holder);
- intVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(BigIntVector bigIntVector, long value, boolean isNonNull, int rowCount) {
- NullableBigIntHolder holder = new NullableBigIntHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = value;
- }
- bigIntVector.setSafe(rowCount, holder);
- bigIntVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(DecimalVector decimalVector, BigDecimal value, boolean isNonNull, int rowCount) {
- NullableDecimalHolder holder = new NullableDecimalHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.precision = value.precision();
- holder.scale = value.scale();
- holder.buffer = decimalVector.getAllocator().buffer(DEFAULT_BUFFER_SIZE);
- holder.start = 0;
- DecimalUtility.writeBigDecimalToArrowBuf(value, holder.buffer, holder.start);
- }
- decimalVector.setSafe(rowCount, holder);
- decimalVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(Float4Vector float4Vector, float value, boolean isNonNull, int rowCount) {
- NullableFloat4Holder holder = new NullableFloat4Holder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = value;
- }
- float4Vector.setSafe(rowCount, holder);
- float4Vector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(Float8Vector float8Vector, double value, boolean isNonNull, int rowCount) {
- NullableFloat8Holder holder = new NullableFloat8Holder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = value;
- }
- float8Vector.setSafe(rowCount, holder);
- float8Vector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(VarCharVector varcharVector, String value, boolean isNonNull, int rowCount) {
- NullableVarCharHolder holder = new NullableVarCharHolder();
- holder.isSet = isNonNull ? 1 : 0;
- varcharVector.setIndexDefined(rowCount);
- if (isNonNull) {
- byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
- holder.buffer = varcharVector.getAllocator().buffer(bytes.length);
- holder.buffer.setBytes(0, bytes, 0, bytes.length);
- holder.start = 0;
- holder.end = bytes.length;
- } else {
- holder.buffer = varcharVector.getAllocator().buffer(0);
- }
- varcharVector.setSafe(rowCount, holder);
- varcharVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(DateMilliVector dateMilliVector, Date date, boolean isNonNull, int rowCount) {
- NullableDateMilliHolder holder = new NullableDateMilliHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull) {
- holder.value = date.getTime();
- }
- dateMilliVector.setSafe(rowCount, holder);
- dateMilliVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(TimeMilliVector timeMilliVector, Time time, boolean isNonNull, int rowCount) {
- NullableTimeMilliHolder holder = new NullableTimeMilliHolder();
- holder.isSet = isNonNull ? 1 : 0;
- if (isNonNull && time != null) {
- holder.value = (int) time.getTime();
- }
- timeMilliVector.setSafe(rowCount, holder);
- timeMilliVector.setValueCount(rowCount + 1);
- }
-
- private static void updateVector(
- TimeStampVector timeStampVector,
- Timestamp timestamp,
- boolean isNonNull,
- int rowCount) {
- //TODO: Need to handle precision such as milli, micro, nano
- timeStampVector.setValueCount(rowCount + 1);
- if (timestamp != null) {
- timeStampVector.setSafe(rowCount, timestamp.getTime());
- } else {
- timeStampVector.setNull(rowCount);
+ throw new UnsupportedOperationException();
}
}
-
- private static void updateVector(
- VarBinaryVector varBinaryVector,
- InputStream is,
- boolean isNonNull,
- int rowCount) throws IOException {
- varBinaryVector.setValueCount(rowCount + 1);
- if (isNonNull && is != null) {
- VarBinaryHolder holder = new VarBinaryHolder();
- ArrowBuf arrowBuf = varBinaryVector.getDataBuffer();
- holder.start = 0;
- byte[] bytes = new byte[DEFAULT_STREAM_BUFFER_SIZE];
- int total = 0;
- while (true) {
- int read = is.read(bytes, 0, DEFAULT_STREAM_BUFFER_SIZE);
- if (read == -1) {
- break;
- }
- arrowBuf.setBytes(total, bytes, total, read);
- total += read;
- }
- holder.end = total;
- holder.buffer = arrowBuf;
- varBinaryVector.set(rowCount, holder);
- varBinaryVector.setIndexDefined(rowCount);
- } else {
- varBinaryVector.setNull(rowCount);
- }
- }
-
- private static void updateVector(
- VarCharVector varcharVector,
- Clob clob,
- boolean isNonNull,
- int rowCount) throws SQLException, IOException {
- varcharVector.setValueCount(rowCount + 1);
- if (isNonNull && clob != null) {
- VarCharHolder holder = new VarCharHolder();
- ArrowBuf arrowBuf = varcharVector.getDataBuffer();
- holder.start = 0;
- long length = clob.length();
- int read = 1;
- int readSize = length < DEFAULT_CLOB_SUBSTRING_READ_SIZE ? (int) length : DEFAULT_CLOB_SUBSTRING_READ_SIZE;
- int totalBytes = 0;
- while (read <= length) {
- String str = clob.getSubString(read, readSize);
- byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
- arrowBuf.setBytes(totalBytes, new ByteArrayInputStream(bytes, 0, bytes.length), bytes.length);
- totalBytes += bytes.length;
- read += readSize;
- }
- holder.end = totalBytes;
- holder.buffer = arrowBuf;
- varcharVector.set(rowCount, holder);
- varcharVector.setIndexDefined(rowCount);
- } else {
- varcharVector.setNull(rowCount);
- }
- }
-
- private static void updateVector(VarBinaryVector varBinaryVector, Blob blob, boolean isNonNull, int rowCount)
- throws SQLException, IOException {
- updateVector(varBinaryVector, blob != null ? blob.getBinaryStream() : null, isNonNull, rowCount);
- }
-
- private static void updateVector(
- ListVector listVector,
- ResultSet resultSet,
- int arrayIndex,
- int rowCount,
- JdbcToArrowConfig config)
- throws SQLException, IOException {
-
- final JdbcFieldInfo fieldInfo = getJdbcFieldInfoForArraySubType(resultSet.getMetaData(), arrayIndex, config);
- if (fieldInfo == null) {
- throw new IllegalArgumentException("Column " + arrayIndex + " is an array of unknown type.");
- }
-
- final int valueCount = listVector.getValueCount();
- final Array array = resultSet.getArray(arrayIndex);
-
- FieldVector fieldVector = listVector.getDataVector();
- int arrayRowCount = 0;
-
- if (!resultSet.wasNull()) {
- listVector.startNewValue(rowCount);
-
- try (ResultSet rs = array.getResultSet()) {
-
- while (rs.next()) {
- jdbcToFieldVector(
- rs,
- JDBC_ARRAY_VALUE_COLUMN,
- fieldInfo.getJdbcType(),
- valueCount + arrayRowCount,
- fieldVector,
- config);
- arrayRowCount++;
- }
- }
-
- listVector.endValue(rowCount, arrayRowCount);
- }
-
- listVector.setValueCount(valueCount + arrayRowCount);
- }
}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java
new file mode 100644
index 0000000..a92c913
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.Array;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.complex.ListVector;
+
+/**
+ * Consumer which consume array type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.complex.ListVector}.
+ */
+public class ArrayConsumer implements JdbcConsumer {
+
+ private final JdbcConsumer delegate;
+ private final int columnIndexInResultSet;
+
+ private ListVector vector;
+
+ /**
+ * Instantiate a ArrayConsumer.
+ */
+ public ArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) {
+ this.columnIndexInResultSet = index;
+ this.delegate = delegate;
+ this.vector = vector;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException, IOException {
+ final Array array = resultSet.getArray(columnIndexInResultSet);
+ int idx = vector.getValueCount();
+ if (!resultSet.wasNull()) {
+
+ vector.startNewValue(idx);
+ int count = 0;
+ try (ResultSet rs = array.getResultSet()) {
+ while (rs.next()) {
+ delegate.consume(rs);
+ count++;
+ }
+ }
+ int end = vector.getOffsetBuffer().getInt(idx * 4) + count;
+ vector.getOffsetBuffer().setInt((idx + 1) * 4, end);
+ BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount());
+ }
+ vector.setValueCount(idx + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java
new file mode 100644
index 0000000..91eb9d1
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+
+/**
+ * Consumer which consume bigint type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.BigIntVector}.
+ */
+public class BigIntConsumer implements JdbcConsumer {
+
+ private final BigIntWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a BigIntConsumer.
+ */
+ public BigIntConsumer(BigIntVector vector, int index) {
+ this.writer = new BigIntWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ long value = resultSet.getLong(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeBigInt(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
new file mode 100644
index 0000000..385b0ab
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume binary type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarBinaryVector}.
+ */
+public class BinaryConsumer implements JdbcConsumer {
+
+ private static final int BUFFER_SIZE = 1024;
+
+ private final VarBinaryWriter writer;
+ private final int columnIndexInResultSet;
+ private BufferAllocator allocator;
+
+ private ArrowBuf reuse;
+ private byte[] reuseBytes;
+
+ /**
+ * Instantiate a BinaryConsumer.
+ */
+ public BinaryConsumer(VarBinaryVector vector, int index) {
+ this.writer = new VarBinaryWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+
+ this.allocator = vector.getAllocator();
+ reuse = allocator.buffer(BUFFER_SIZE);
+ reuseBytes = new byte[BUFFER_SIZE];
+ }
+
+ /**
+ * consume a InputStream.
+ */
+ public void consume(InputStream is) throws IOException {
+ if (is != null) {
+ int length = is.available();
+ if (length > reuse.capacity()) {
+ reuse.close();
+ reuse = allocator.buffer(length);
+ }
+
+ int total = 0;
+ int read;
+ while ((read = is.read(reuseBytes, 0, reuseBytes.length)) != -1) {
+ reuse.setBytes(total, reuseBytes, 0, read);
+ total += read;
+ }
+ writer.writeVarBinary(0, total, reuse);
+ }
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException, IOException {
+ InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ consume(is);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ public void moveWriterPosition() {
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void close() {
+ if (reuse != null) {
+ reuse.close();
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java
new file mode 100644
index 0000000..c61ce3e
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.complex.impl.BitWriterImpl;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+
+/**
+ * Consumer which consume bit type values from {@link ResultSet}.
+ * Write the data to {@link BitVector}.
+ */
+public class BitConsumer implements JdbcConsumer {
+
+ private final BitWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a BitConsumer.
+ */
+ public BitConsumer(BitVector vector, int index) {
+ this.writer = new BitWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ boolean value = resultSet.getBoolean(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeBit(value ? 1 : 0);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java
new file mode 100644
index 0000000..b623133
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.VarBinaryVector;
+
+/**
+ * Consumer which consume blob type values from {@link ResultSet}.
+ * Write the data to {@link VarBinaryVector}.
+ */
+public class BlobConsumer implements JdbcConsumer {
+
+ private final int columnIndexInResultSet;
+
+ private final BinaryConsumer delegate;
+
+ /**
+ * Instantiate a BlobConsumer.
+ */
+ public BlobConsumer(BinaryConsumer delegate, int index) {
+ this.columnIndexInResultSet = index;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException, IOException {
+ Blob blob = resultSet.getBlob(columnIndexInResultSet);
+ if (blob != null) {
+ delegate.consume(blob.getBinaryStream());
+ }
+ delegate.moveWriterPosition();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java
new file mode 100644
index 0000000..73975f7
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Clob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume clob type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarCharVector}.
+ */
+public class ClobConsumer implements JdbcConsumer {
+
+ private static final int BUFFER_SIZE = 256;
+
+ private final VarCharWriter writer;
+ private final int columnIndexInResultSet;
+ private BufferAllocator allocator;
+
+ private ArrowBuf reuse;
+
+ /**
+ * Instantiate a ClobConsumer.
+ */
+ public ClobConsumer(VarCharVector vector, int index) {
+ this.writer = new VarCharWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+
+ this.allocator = vector.getAllocator();
+ reuse = allocator.buffer(1024);
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ Clob clob = resultSet.getClob(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ if (clob != null) {
+ long length = clob.length();
+ if (length > reuse.capacity()) {
+ reuse.close();
+ reuse = allocator.buffer((int) length);
+ }
+
+ int read = 1;
+ int readSize = length < BUFFER_SIZE ? (int) length : BUFFER_SIZE;
+ int totalBytes = 0;
+ while (read <= length) {
+ String str = clob.getSubString(read, readSize);
+ byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+ reuse.setBytes(totalBytes, bytes, 0, bytes.length);
+ totalBytes += bytes.length;
+ read += readSize;
+ }
+ writer.writeVarChar(0, totalBytes, reuse);
+ }
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void close() {
+ if (reuse != null) {
+ reuse.close();
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
new file mode 100644
index 0000000..2c598c8
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Composite consumer which hold all consumers.
+ * It manages the consume and cleanup process.
+ */
+public class CompositeJdbcConsumer implements JdbcConsumer {
+
+ private final JdbcConsumer[] consumers;
+ private int readRowCount;
+
+ public int getReadRowCount() {
+ return readRowCount;
+ }
+
+ public CompositeJdbcConsumer(JdbcConsumer[] consumers) {
+ this.consumers = consumers;
+ }
+
+ @Override
+ public void consume(ResultSet rs) throws SQLException, IOException {
+ while (rs.next()) {
+ for (JdbcConsumer consumer : consumers) {
+ consumer.consume(rs);
+ }
+ readRowCount++;
+ }
+ }
+
+ @Override
+ public void close() {
+ // clean up
+ for (JdbcConsumer consumer : consumers) {
+ consumer.close();
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java
new file mode 100644
index 0000000..6be00c0
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.complex.impl.DateMilliWriterImpl;
+import org.apache.arrow.vector.complex.writer.DateMilliWriter;
+
+/**
+ * Consumer which consume date type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.DateMilliVector}.
+ */
+public class DateConsumer implements JdbcConsumer {
+
+ private final DateMilliWriter writer;
+ private final int columnIndexInResultSet;
+ private final Calendar calendar;
+
+ /**
+ * Instantiate a DateConsumer.
+ */
+ public DateConsumer(DateMilliVector vector, int index) {
+ this (vector, index, null);
+ }
+
+ /**
+ * Instantiate a DateConsumer.
+ */
+ public DateConsumer(DateMilliVector vector, int index, Calendar calendar) {
+ this.writer = new DateMilliWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ this.calendar = calendar;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ Date date = calendar == null ? resultSet.getDate(columnIndexInResultSet) :
+ resultSet.getDate(columnIndexInResultSet, calendar);
+ if (!resultSet.wasNull()) {
+ writer.writeDateMilli(date.getTime());
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java
new file mode 100644
index 0000000..862471c
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.complex.impl.DecimalWriterImpl;
+import org.apache.arrow.vector.complex.writer.DecimalWriter;
+
+/**
+ * Consumer which consume decimal type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.DecimalVector}.
+ */
+public class DecimalConsumer implements JdbcConsumer {
+
+ private final DecimalWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a DecimalConsumer.
+ */
+ public DecimalConsumer(DecimalVector vector, int index) {
+ this.writer = new DecimalWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ BigDecimal value = resultSet.getBigDecimal(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeDecimal(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java
new file mode 100644
index 0000000..b1dcb9e
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+
+/**
+ * Consumer which consume double type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.Float8Vector}.
+ */
+public class DoubleConsumer implements JdbcConsumer {
+
+ private final Float8Writer writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a DoubleConsumer.
+ */
+ public DoubleConsumer(Float8Vector vector, int index) {
+ this.writer = new Float8WriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ double value = resultSet.getDouble(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeFloat8(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java
new file mode 100644
index 0000000..c7922b1
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+
+/**
+ * Consumer which consume float type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.Float4Vector}.
+ */
+public class FloatConsumer implements JdbcConsumer {
+
+ private final Float4Writer writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a FloatConsumer.
+ */
+ public FloatConsumer(Float4Vector vector, int index) {
+ this.writer = new Float4WriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ float value = resultSet.getFloat(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeFloat4(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java
new file mode 100644
index 0000000..8a60e59
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.complex.impl.IntWriterImpl;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+
+/**
+ * Consumer which consume int type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.IntVector}.
+ */
+public class IntConsumer implements JdbcConsumer {
+
+ private final IntWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a IntConsumer.
+ */
+ public IntConsumer(IntVector vector, int index) {
+ this.writer = new IntWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ int value = resultSet.getInt(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeInt(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java
new file mode 100644
index 0000000..5030503
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * An abstraction that is used to consume values from {@link ResultSet}.
+ */
+public interface JdbcConsumer extends AutoCloseable {
+
+ /**
+ * Consume a specific type value from {@link ResultSet} and write it to vector.
+ */
+ void consume(ResultSet resultSet) throws SQLException, IOException;
+
+ /**
+ * Close this consumer, do some clean work such as clear reuse ArrowBuf.
+ */
+ default void close() {}
+
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java
new file mode 100644
index 0000000..1c76f86
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.SmallIntWriter;
+
+/**
+ * Consumer which consume smallInt type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.SmallIntVector}.
+ */
+public class SmallIntConsumer implements JdbcConsumer {
+
+ private final SmallIntWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a SmallIntConsumer.
+ */
+ public SmallIntConsumer(SmallIntVector vector, int index) {
+ this.writer = new SmallIntWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ short value = resultSet.getShort(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeSmallInt(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java
new file mode 100644
index 0000000..46d3f32
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.complex.impl.TimeMilliWriterImpl;
+import org.apache.arrow.vector.complex.writer.TimeMilliWriter;
+
+/**
+ * Consumer which consume time type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.TimeMilliVector}.
+ */
+public class TimeConsumer implements JdbcConsumer {
+
+ private final TimeMilliWriter writer;
+ private final int columnIndexInResultSet;
+ private final Calendar calendar;
+
+ /**
+ * Instantiate a TimeConsumer.
+ */
+ public TimeConsumer(TimeMilliVector vector, int index) {
+ this(vector, index, null);
+ }
+
+ /**
+ * Instantiate a TimeConsumer.
+ */
+ public TimeConsumer(TimeMilliVector vector, int index, Calendar calendar) {
+ this.writer = new TimeMilliWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ this.calendar = calendar;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ Time time = calendar == null ? resultSet.getTime(columnIndexInResultSet) :
+ resultSet.getTime(columnIndexInResultSet, calendar);
+ if (!resultSet.wasNull()) {
+ writer.writeTimeMilli((int) time.getTime());
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java
new file mode 100644
index 0000000..aef10e3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.complex.impl.TimeStampMilliTZWriterImpl;
+import org.apache.arrow.vector.complex.writer.TimeStampMilliTZWriter;
+
+/**
+ * Consumer which consume timestamp type values from {@link ResultSet}.
+ * Write the data to {@link TimeStampMilliTZVector}.
+ */
+public class TimestampConsumer implements JdbcConsumer {
+
+ private final TimeStampMilliTZWriter writer;
+ private final int columnIndexInResultSet;
+ private final Calendar calendar;
+
+ /**
+ * Instantiate a TimestampConsumer.
+ */
+ public TimestampConsumer(TimeStampMilliTZVector vector, int index) {
+ this(vector, index, null);
+ }
+
+ /**
+ * Instantiate a TimestampConsumer.
+ */
+ public TimestampConsumer(TimeStampMilliTZVector vector, int index, Calendar calendar) {
+ this.writer = new TimeStampMilliTZWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ this.calendar = calendar;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ Timestamp timestamp = calendar == null ? resultSet.getTimestamp(columnIndexInResultSet) :
+ resultSet.getTimestamp(columnIndexInResultSet, calendar);
+ if (!resultSet.wasNull()) {
+ writer.writeTimeStampMilliTZ(timestamp.getTime());
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java
new file mode 100644
index 0000000..ca310c4
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.complex.impl.TinyIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.TinyIntWriter;
+
+/**
+ * Consumer which consume tinyInt type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.TinyIntVector}.
+ */
+public class TinyIntConsumer implements JdbcConsumer {
+
+ private final TinyIntWriter writer;
+ private final int columnIndexInResultSet;
+
+ /**
+ * Instantiate a TinyIntConsumer.
+ */
+ public TinyIntConsumer(TinyIntVector vector, int index) {
+ this.writer = new TinyIntWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ byte value = resultSet.getByte(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ writer.writeTinyInt(value);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java
new file mode 100644
index 0000000..9eb6eff
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume varchar type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarCharVector}.
+ */
+public class VarCharConsumer implements JdbcConsumer {
+
+ private final VarCharWriter writer;
+ private final int columnIndexInResultSet;
+ private BufferAllocator allocator;
+
+ private ArrowBuf reuse;
+
+ /**
+ * Instantiate a VarCharConsumer.
+ */
+ public VarCharConsumer(VarCharVector vector, int index) {
+ this.writer = new VarCharWriterImpl(vector);
+ this.columnIndexInResultSet = index;
+ this.allocator = vector.getAllocator();
+ }
+
+ @Override
+ public void consume(ResultSet resultSet) throws SQLException {
+ String value = resultSet.getString(columnIndexInResultSet);
+ if (!resultSet.wasNull()) {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ if (reuse == null) {
+ reuse = allocator.buffer(bytes.length);
+ }
+ if (bytes.length > reuse.capacity()) {
+ reuse.close();
+ reuse = allocator.buffer(bytes.length);
+ }
+ reuse.setBytes(0, bytes, 0, bytes.length);
+ writer.writeVarChar(0, bytes.length, reuse);
+ }
+ writer.setPosition(writer.getPosition() + 1);
+ }
+
+ @Override
+ public void close() {
+ if (reuse != null) {
+ reuse.close();
+ }
+ }
+}