You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/08/12 03:08:45 UTC

[arrow] branch master updated: ARROW-6083: [Java] Refactor Jdbc adapter consume logic

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5daba72  ARROW-6083: [Java] Refactor Jdbc adapter consume logic
5daba72 is described below

commit 5daba72aef8810940b5bfadb807851f87c219020
Author: tianchen <ni...@alibaba-inc.com>
AuthorDate: Sun Aug 11 20:06:13 2019 -0700

    ARROW-6083: [Java] Refactor Jdbc adapter consume logic
    
    Related to [ARROW-6083](https://issues.apache.org/jira/browse/ARROW-6083).
    
    Jdbc adapter read from ResultSet looks like:
    
    while (rs.next()) {
    for (int i = 1; i <= columnCount; i++)
    { jdbcToFieldVector( rs, i, rs.getMetaData().getColumnType(i), rowCount, root.getVector(rsmd.getColumnName(i)), config); }
    rowCount++;
    }
    
    And in jdbcToFieldVector has lots of switch-case, that is to see, for every single value from ResultSet we have to do lots of analyzing conditions.
    
    I think we could optimize this using consumer/delegate like avro adapter.
    
    Closes #4978 from tianchen92/ARROW-6083 and squashes the following commits:
    
    6acff423d <tianchen> fix potential leak
    85e0ebd82 <tianchen> resolve comments
    1318574e7 <tianchen> remove addNull API
    fa13e5971 <tianchen> add constructor with no Calendar
    2b0aa78d6 <tianchen> resolve comments
    aa5c9d927 <tianchen> ARROW-6083:  Refactor Jdbc adapter consume logic
    
    Authored-by: tianchen <ni...@alibaba-inc.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 .../arrow/adapter/jdbc/JdbcToArrowUtils.java       | 416 +++------------------
 .../arrow/adapter/jdbc/consumer/ArrayConsumer.java |  68 ++++
 .../adapter/jdbc/consumer/BigIntConsumer.java      |  52 +++
 .../adapter/jdbc/consumer/BinaryConsumer.java      |  99 +++++
 .../arrow/adapter/jdbc/consumer/BitConsumer.java   |  52 +++
 .../arrow/adapter/jdbc/consumer/BlobConsumer.java  |  58 +++
 .../arrow/adapter/jdbc/consumer/ClobConsumer.java  |  90 +++++
 .../jdbc/consumer/CompositeJdbcConsumer.java       |  58 +++
 .../arrow/adapter/jdbc/consumer/DateConsumer.java  |  64 ++++
 .../adapter/jdbc/consumer/DecimalConsumer.java     |  53 +++
 .../adapter/jdbc/consumer/DoubleConsumer.java      |  52 +++
 .../arrow/adapter/jdbc/consumer/FloatConsumer.java |  52 +++
 .../arrow/adapter/jdbc/consumer/IntConsumer.java   |  52 +++
 .../arrow/adapter/jdbc/consumer/JdbcConsumer.java  |  39 ++
 .../adapter/jdbc/consumer/SmallIntConsumer.java    |  52 +++
 .../arrow/adapter/jdbc/consumer/TimeConsumer.java  |  64 ++++
 .../adapter/jdbc/consumer/TimestampConsumer.java   |  64 ++++
 .../adapter/jdbc/consumer/TinyIntConsumer.java     |  52 +++
 .../adapter/jdbc/consumer/VarCharConsumer.java     |  76 ++++
 19 files changed, 1151 insertions(+), 362 deletions(-)

diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
index 4d7f6ea..3ccdfad 100644
--- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcToArrowUtils.java
@@ -20,14 +20,7 @@ package org.apache.arrow.adapter.jdbc;
 import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
 import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.Clob;
 import java.sql.Date;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
@@ -43,6 +36,24 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.TimeZone;
 
+import org.apache.arrow.adapter.jdbc.consumer.ArrayConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BigIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BinaryConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BitConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.BlobConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.ClobConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.CompositeJdbcConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DateConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DecimalConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.DoubleConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.FloatConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.IntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.SmallIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TimeConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TimestampConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.TinyIntConsumer;
+import org.apache.arrow.adapter.jdbc.consumer.VarCharConsumer;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.BaseFixedWidthVector;
@@ -56,34 +67,18 @@ import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
-import org.apache.arrow.vector.holders.NullableBigIntHolder;
-import org.apache.arrow.vector.holders.NullableBitHolder;
-import org.apache.arrow.vector.holders.NullableDateMilliHolder;
-import org.apache.arrow.vector.holders.NullableDecimalHolder;
-import org.apache.arrow.vector.holders.NullableFloat4Holder;
-import org.apache.arrow.vector.holders.NullableFloat8Holder;
-import org.apache.arrow.vector.holders.NullableIntHolder;
-import org.apache.arrow.vector.holders.NullableSmallIntHolder;
-import org.apache.arrow.vector.holders.NullableTimeMilliHolder;
-import org.apache.arrow.vector.holders.NullableTinyIntHolder;
-import org.apache.arrow.vector.holders.NullableVarCharHolder;
-import org.apache.arrow.vector.holders.VarBinaryHolder;
-import org.apache.arrow.vector.holders.VarCharHolder;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.arrow.vector.util.DecimalUtility;
-
-import io.netty.buffer.ArrowBuf;
 
 /**
  * Class that does most of the work to convert JDBC ResultSet data into Arrow columnar format Vector objects.
@@ -93,8 +88,6 @@ import io.netty.buffer.ArrowBuf;
 public class JdbcToArrowUtils {
 
   private static final int DEFAULT_BUFFER_SIZE = 256;
-  private static final int DEFAULT_STREAM_BUFFER_SIZE = 1024;
-  private static final int DEFAULT_CLOB_SUBSTRING_READ_SIZE = 256;
 
   private static final int JDBC_ARRAY_VALUE_COLUMN = 2;
 
@@ -357,379 +350,78 @@ public class JdbcToArrowUtils {
   public static void jdbcToArrowVectors(ResultSet rs, VectorSchemaRoot root, JdbcToArrowConfig config)
       throws SQLException, IOException {
 
-    Preconditions.checkNotNull(rs, "JDBC ResultSet object can't be null");
-    Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null");
-    Preconditions.checkNotNull(config, "JDBC-to-Arrow configuration cannot be null");
-
     ResultSetMetaData rsmd = rs.getMetaData();
     int columnCount = rsmd.getColumnCount();
-
     allocateVectors(root, DEFAULT_BUFFER_SIZE);
 
-    int rowCount = 0;
-    while (rs.next()) {
-      for (int i = 1; i <= columnCount; i++) {
-        jdbcToFieldVector(
-            rs,
-            i,
-            rs.getMetaData().getColumnType(i),
-            rowCount,
-            root.getVector(rsmd.getColumnName(i)),
-            config);
-      }
-      rowCount++;
+    JdbcConsumer[] consumers = new JdbcConsumer[columnCount];
+    for (int i = 1; i <= columnCount; i++) {
+      consumers[i - 1] = getConsumer(rs, i, rs.getMetaData().getColumnType(i),
+          root.getVector(rsmd.getColumnName(i)), config);
     }
-    root.setRowCount(rowCount);
-  }
 
-  private static void jdbcToFieldVector(
-      ResultSet rs,
-      int columnIndex,
-      int jdbcColType,
-      int rowCount,
-      FieldVector vector,
-      JdbcToArrowConfig config)
-          throws SQLException, IOException {
+    try (CompositeJdbcConsumer compositeConsumer = new CompositeJdbcConsumer(consumers)) {
+      compositeConsumer.consume(rs);
+      root.setRowCount(compositeConsumer.getReadRowCount());
+    }
+  }
 
+  private static JdbcConsumer getConsumer(ResultSet resultSet, int columnIndex, int jdbcColType,
+      FieldVector vector, JdbcToArrowConfig config) throws SQLException {
     final Calendar calendar = config.getCalendar();
-
     switch (jdbcColType) {
       case Types.BOOLEAN:
       case Types.BIT:
-        updateVector((BitVector) vector,
-                rs.getBoolean(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new BitConsumer((BitVector) vector, columnIndex);
       case Types.TINYINT:
-        updateVector((TinyIntVector) vector,
-                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new TinyIntConsumer((TinyIntVector) vector, columnIndex);
       case Types.SMALLINT:
-        updateVector((SmallIntVector) vector,
-                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new SmallIntConsumer((SmallIntVector) vector, columnIndex);
       case Types.INTEGER:
-        updateVector((IntVector) vector,
-                rs.getInt(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new IntConsumer((IntVector) vector, columnIndex);
       case Types.BIGINT:
-        updateVector((BigIntVector) vector,
-                rs.getLong(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new BigIntConsumer((BigIntVector) vector, columnIndex);
       case Types.NUMERIC:
       case Types.DECIMAL:
-        updateVector((DecimalVector) vector,
-                rs.getBigDecimal(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new DecimalConsumer((DecimalVector) vector, columnIndex);
       case Types.REAL:
       case Types.FLOAT:
-        updateVector((Float4Vector) vector,
-                rs.getFloat(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new FloatConsumer((Float4Vector) vector, columnIndex);
       case Types.DOUBLE:
-        updateVector((Float8Vector) vector,
-                rs.getDouble(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new DoubleConsumer((Float8Vector) vector, columnIndex);
       case Types.CHAR:
       case Types.NCHAR:
       case Types.VARCHAR:
       case Types.NVARCHAR:
       case Types.LONGVARCHAR:
       case Types.LONGNVARCHAR:
-        updateVector((VarCharVector) vector,
-                rs.getString(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new VarCharConsumer((VarCharVector) vector, columnIndex);
       case Types.DATE:
-        final Date date;
-        if (calendar != null) {
-          date = rs.getDate(columnIndex, calendar);
-        } else {
-          date = rs.getDate(columnIndex);
-        }
-
-        updateVector((DateMilliVector) vector, date, !rs.wasNull(), rowCount);
-        break;
+        return new DateConsumer((DateMilliVector) vector, columnIndex, calendar);
       case Types.TIME:
-        final Time time;
-        if (calendar != null) {
-          time = rs.getTime(columnIndex, calendar);
-        } else {
-          time = rs.getTime(columnIndex);
-        }
-
-        updateVector((TimeMilliVector) vector, time, !rs.wasNull(), rowCount);
-        break;
+        return new TimeConsumer((TimeMilliVector) vector, columnIndex, calendar);
       case Types.TIMESTAMP:
-        final Timestamp ts;
-        if (calendar != null) {
-          ts = rs.getTimestamp(columnIndex, calendar);
-        } else {
-          ts = rs.getTimestamp(columnIndex);
-        }
-
-        // TODO: Need to handle precision such as milli, micro, nano
-        updateVector((TimeStampVector) vector, ts, !rs.wasNull(), rowCount);
-        break;
+        return new TimestampConsumer((TimeStampMilliTZVector) vector, columnIndex, calendar);
       case Types.BINARY:
       case Types.VARBINARY:
       case Types.LONGVARBINARY:
-        updateVector((VarBinaryVector) vector,
-                rs.getBinaryStream(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new BinaryConsumer((VarBinaryVector) vector, columnIndex);
       case Types.ARRAY:
-        updateVector((ListVector) vector, rs, columnIndex, rowCount, config);
-        break;
+        final JdbcFieldInfo fieldInfo = getJdbcFieldInfoForArraySubType(resultSet.getMetaData(), columnIndex, config);
+        if (fieldInfo == null) {
+          throw new IllegalArgumentException("Column " + columnIndex + " is an array of unknown type.");
+        }
+        JdbcConsumer delegate = getConsumer(resultSet, JDBC_ARRAY_VALUE_COLUMN,
+            fieldInfo.getJdbcType(), ((ListVector)vector).getDataVector(), config);
+        return new ArrayConsumer((ListVector) vector, delegate, columnIndex);
       case Types.CLOB:
-        updateVector((VarCharVector) vector,
-                rs.getClob(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        return new ClobConsumer((VarCharVector) vector, columnIndex);
       case Types.BLOB:
-        updateVector((VarBinaryVector) vector,
-                rs.getBlob(columnIndex), !rs.wasNull(), rowCount);
-        break;
+        BinaryConsumer delegateConsumer = new BinaryConsumer((VarBinaryVector) vector, columnIndex);
+        return new BlobConsumer(delegateConsumer, columnIndex);
       default:
         // no-op, shouldn't get here
-        break;
-    }
-  }
-
-  private static void updateVector(BitVector bitVector, boolean value, boolean isNonNull, int rowCount) {
-    NullableBitHolder holder = new NullableBitHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = value ? 1 : 0;
-    }
-    bitVector.setSafe(rowCount, holder);
-    bitVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(TinyIntVector tinyIntVector, int value, boolean isNonNull, int rowCount) {
-    NullableTinyIntHolder holder = new NullableTinyIntHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = (byte) value;
-    }
-    tinyIntVector.setSafe(rowCount, holder);
-    tinyIntVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(SmallIntVector smallIntVector, int value, boolean isNonNull, int rowCount) {
-    NullableSmallIntHolder holder = new NullableSmallIntHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = (short) value;
-    }
-    smallIntVector.setSafe(rowCount, holder);
-    smallIntVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(IntVector intVector, int value, boolean isNonNull, int rowCount) {
-    NullableIntHolder holder = new NullableIntHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = value;
-    }
-    intVector.setSafe(rowCount, holder);
-    intVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(BigIntVector bigIntVector, long value, boolean isNonNull, int rowCount) {
-    NullableBigIntHolder holder = new NullableBigIntHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = value;
-    }
-    bigIntVector.setSafe(rowCount, holder);
-    bigIntVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(DecimalVector decimalVector, BigDecimal value, boolean isNonNull, int rowCount) {
-    NullableDecimalHolder holder = new NullableDecimalHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.precision = value.precision();
-      holder.scale = value.scale();
-      holder.buffer = decimalVector.getAllocator().buffer(DEFAULT_BUFFER_SIZE);
-      holder.start = 0;
-      DecimalUtility.writeBigDecimalToArrowBuf(value, holder.buffer, holder.start);
-    }
-    decimalVector.setSafe(rowCount, holder);
-    decimalVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(Float4Vector float4Vector, float value, boolean isNonNull, int rowCount) {
-    NullableFloat4Holder holder = new NullableFloat4Holder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = value;
-    }
-    float4Vector.setSafe(rowCount, holder);
-    float4Vector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(Float8Vector float8Vector, double value, boolean isNonNull, int rowCount) {
-    NullableFloat8Holder holder = new NullableFloat8Holder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = value;
-    }
-    float8Vector.setSafe(rowCount, holder);
-    float8Vector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(VarCharVector varcharVector, String value, boolean isNonNull, int rowCount) {
-    NullableVarCharHolder holder = new NullableVarCharHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    varcharVector.setIndexDefined(rowCount);
-    if (isNonNull) {
-      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-      holder.buffer = varcharVector.getAllocator().buffer(bytes.length);
-      holder.buffer.setBytes(0, bytes, 0, bytes.length);
-      holder.start = 0;
-      holder.end = bytes.length;
-    } else {
-      holder.buffer = varcharVector.getAllocator().buffer(0);
-    }
-    varcharVector.setSafe(rowCount, holder);
-    varcharVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(DateMilliVector dateMilliVector, Date date, boolean isNonNull, int rowCount) {
-    NullableDateMilliHolder holder = new NullableDateMilliHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull) {
-      holder.value = date.getTime();
-    }
-    dateMilliVector.setSafe(rowCount, holder);
-    dateMilliVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(TimeMilliVector timeMilliVector, Time time, boolean isNonNull, int rowCount) {
-    NullableTimeMilliHolder holder = new NullableTimeMilliHolder();
-    holder.isSet = isNonNull ? 1 : 0;
-    if (isNonNull && time != null) {
-      holder.value = (int) time.getTime();
-    }
-    timeMilliVector.setSafe(rowCount, holder);
-    timeMilliVector.setValueCount(rowCount + 1);
-  }
-
-  private static void updateVector(
-      TimeStampVector timeStampVector,
-      Timestamp timestamp,
-      boolean isNonNull,
-      int rowCount) {
-    //TODO: Need to handle precision such as milli, micro, nano
-    timeStampVector.setValueCount(rowCount + 1);
-    if (timestamp != null) {
-      timeStampVector.setSafe(rowCount, timestamp.getTime());
-    } else {
-      timeStampVector.setNull(rowCount);
+        throw new UnsupportedOperationException();
     }
   }
-
-  private static void updateVector(
-      VarBinaryVector varBinaryVector,
-      InputStream is,
-      boolean isNonNull,
-      int rowCount) throws IOException {
-    varBinaryVector.setValueCount(rowCount + 1);
-    if (isNonNull && is != null) {
-      VarBinaryHolder holder = new VarBinaryHolder();
-      ArrowBuf arrowBuf = varBinaryVector.getDataBuffer();
-      holder.start = 0;
-      byte[] bytes = new byte[DEFAULT_STREAM_BUFFER_SIZE];
-      int total = 0;
-      while (true) {
-        int read = is.read(bytes, 0, DEFAULT_STREAM_BUFFER_SIZE);
-        if (read == -1) {
-          break;
-        }
-        arrowBuf.setBytes(total, bytes, total, read);
-        total += read;
-      }
-      holder.end = total;
-      holder.buffer = arrowBuf;
-      varBinaryVector.set(rowCount, holder);
-      varBinaryVector.setIndexDefined(rowCount);
-    } else {
-      varBinaryVector.setNull(rowCount);
-    }
-  }
-
-  private static void updateVector(
-      VarCharVector varcharVector,
-      Clob clob,
-      boolean isNonNull,
-      int rowCount) throws SQLException, IOException {
-    varcharVector.setValueCount(rowCount + 1);
-    if (isNonNull && clob != null) {
-      VarCharHolder holder = new VarCharHolder();
-      ArrowBuf arrowBuf = varcharVector.getDataBuffer();
-      holder.start = 0;
-      long length = clob.length();
-      int read = 1;
-      int readSize = length < DEFAULT_CLOB_SUBSTRING_READ_SIZE ? (int) length : DEFAULT_CLOB_SUBSTRING_READ_SIZE;
-      int totalBytes = 0;
-      while (read <= length) {
-        String str = clob.getSubString(read, readSize);
-        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
-        arrowBuf.setBytes(totalBytes, new ByteArrayInputStream(bytes, 0, bytes.length), bytes.length);
-        totalBytes += bytes.length;
-        read += readSize;
-      }
-      holder.end = totalBytes;
-      holder.buffer = arrowBuf;
-      varcharVector.set(rowCount, holder);
-      varcharVector.setIndexDefined(rowCount);
-    } else {
-      varcharVector.setNull(rowCount);
-    }
-  }
-
-  private static void updateVector(VarBinaryVector varBinaryVector, Blob blob, boolean isNonNull, int rowCount)
-      throws SQLException, IOException {
-    updateVector(varBinaryVector, blob != null ? blob.getBinaryStream() : null, isNonNull, rowCount);
-  }
-
-  private static void updateVector(
-      ListVector listVector,
-      ResultSet resultSet,
-      int arrayIndex,
-      int rowCount,
-      JdbcToArrowConfig config)
-      throws SQLException, IOException {
-
-    final JdbcFieldInfo fieldInfo = getJdbcFieldInfoForArraySubType(resultSet.getMetaData(), arrayIndex, config);
-    if (fieldInfo == null) {
-      throw new IllegalArgumentException("Column " + arrayIndex + " is an array of unknown type.");
-    }
-
-    final int valueCount = listVector.getValueCount();
-    final Array array = resultSet.getArray(arrayIndex);
-
-    FieldVector fieldVector = listVector.getDataVector();
-    int arrayRowCount = 0;
-
-    if (!resultSet.wasNull()) {
-      listVector.startNewValue(rowCount);
-
-      try (ResultSet rs = array.getResultSet()) {
-
-        while (rs.next()) {
-          jdbcToFieldVector(
-              rs,
-              JDBC_ARRAY_VALUE_COLUMN,
-              fieldInfo.getJdbcType(),
-              valueCount + arrayRowCount,
-              fieldVector,
-              config);
-          arrayRowCount++;
-        }
-      }
-
-      listVector.endValue(rowCount, arrayRowCount);
-    }
-
-    listVector.setValueCount(valueCount + arrayRowCount);
-  }
 }
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java
new file mode 100644
index 0000000..a92c913
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ArrayConsumer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.Array;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BitVectorHelper;
+import org.apache.arrow.vector.complex.ListVector;
+
+/**
+ * Consumer which consume array type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.complex.ListVector}.
+ */
+public class ArrayConsumer implements JdbcConsumer {
+
+  private final JdbcConsumer delegate;
+  private final int columnIndexInResultSet;
+
+  private ListVector vector;
+
+  /**
+   * Instantiate a ArrayConsumer.
+   */
+  public ArrayConsumer(ListVector vector, JdbcConsumer delegate, int index) {
+    this.columnIndexInResultSet = index;
+    this.delegate = delegate;
+    this.vector = vector;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException, IOException {
+    final Array array = resultSet.getArray(columnIndexInResultSet);
+    int idx = vector.getValueCount();
+    if (!resultSet.wasNull()) {
+
+      vector.startNewValue(idx);
+      int count = 0;
+      try (ResultSet rs = array.getResultSet()) {
+        while (rs.next()) {
+          delegate.consume(rs);
+          count++;
+        }
+      }
+      int end = vector.getOffsetBuffer().getInt(idx * 4) + count;
+      vector.getOffsetBuffer().setInt((idx + 1) * 4, end);
+      BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount());
+    }
+    vector.setValueCount(idx + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java
new file mode 100644
index 0000000..91eb9d1
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BigIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.BigIntWriter;
+
+/**
+ * Consumer which consume bigint type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.BigIntVector}.
+ */
+public class BigIntConsumer implements JdbcConsumer {
+
+  private final BigIntWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a BigIntConsumer.
+   */
+  public BigIntConsumer(BigIntVector vector, int index) {
+    this.writer = new BigIntWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    long value = resultSet.getLong(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeBigInt(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
new file mode 100644
index 0000000..385b0ab
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume binary type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarBinaryVector}.
+ */
+public class BinaryConsumer implements JdbcConsumer {
+
+  private static final int BUFFER_SIZE = 1024;
+
+  private final VarBinaryWriter writer;
+  private final int columnIndexInResultSet;
+  private BufferAllocator allocator;
+
+  private ArrowBuf reuse;
+  private byte[] reuseBytes;
+
+  /**
+   * Instantiate a BinaryConsumer.
+   */
+  public BinaryConsumer(VarBinaryVector vector, int index) {
+    this.writer = new VarBinaryWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+
+    this.allocator = vector.getAllocator();
+    reuse = allocator.buffer(BUFFER_SIZE);
+    reuseBytes = new byte[BUFFER_SIZE];
+  }
+
+  /**
+   * consume a InputStream.
+   */
+  public void consume(InputStream is) throws IOException {
+    if (is != null) {
+      int length = is.available();
+      if (length > reuse.capacity()) {
+        reuse.close();
+        reuse = allocator.buffer(length);
+      }
+
+      int total = 0;
+      int read;
+      while ((read = is.read(reuseBytes, 0, reuseBytes.length)) != -1) {
+        reuse.setBytes(total, reuseBytes, 0, read);
+        total += read;
+      }
+      writer.writeVarBinary(0, total, reuse);
+    }
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException, IOException {
+    InputStream is = resultSet.getBinaryStream(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      consume(is);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  public void moveWriterPosition() {
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void close() {
+    if (reuse != null) {
+      reuse.close();
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java
new file mode 100644
index 0000000..c61ce3e
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BitConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.complex.impl.BitWriterImpl;
+import org.apache.arrow.vector.complex.writer.BitWriter;
+
+/**
+ * Consumer which consume bit type values from {@link ResultSet}.
+ * Write the data to {@link BitVector}.
+ */
+public class BitConsumer implements JdbcConsumer {
+
+  private final BitWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a BitConsumer.
+   */
+  public BitConsumer(BitVector vector, int index) {
+    this.writer = new BitWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    boolean value = resultSet.getBoolean(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeBit(value ? 1 : 0);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java
new file mode 100644
index 0000000..b623133
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BlobConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.Blob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.VarBinaryVector;
+
+/**
+ * Consumer which consume blob type values from {@link ResultSet}.
+ * Write the data to {@link VarBinaryVector}.
+ */
+public class BlobConsumer implements JdbcConsumer {
+
+  private final int columnIndexInResultSet;
+
+  private final BinaryConsumer delegate;
+
+  /**
+   * Instantiate a BlobConsumer.
+   */
+  public BlobConsumer(BinaryConsumer delegate, int index) {
+    this.columnIndexInResultSet = index;
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException, IOException {
+    Blob blob = resultSet.getBlob(columnIndexInResultSet);
+    if (blob != null) {
+      delegate.consume(blob.getBinaryStream());
+    }
+    delegate.moveWriterPosition();
+  }
+
+  @Override
+  public void close() {
+    delegate.close();
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java
new file mode 100644
index 0000000..73975f7
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/ClobConsumer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Clob;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume clob type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarCharVector}.
+ */
+public class ClobConsumer implements JdbcConsumer {
+
+  private static final int BUFFER_SIZE = 256;
+
+  private final VarCharWriter writer;
+  private final int columnIndexInResultSet;
+  private BufferAllocator allocator;
+
+  private ArrowBuf reuse;
+
+  /**
+   * Instantiate a ClobConsumer.
+   */
+  public ClobConsumer(VarCharVector vector, int index) {
+    this.writer = new VarCharWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+
+    this.allocator = vector.getAllocator();
+    reuse = allocator.buffer(1024);
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    Clob clob = resultSet.getClob(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      if (clob != null) {
+        long length = clob.length();
+        if (length > reuse.capacity()) {
+          reuse.close();
+          reuse = allocator.buffer((int) length);
+        }
+
+        int read = 1;
+        int readSize = length < BUFFER_SIZE ? (int) length : BUFFER_SIZE;
+        int totalBytes = 0;
+        while (read <= length) {
+          String str = clob.getSubString(read, readSize);
+          byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
+          reuse.setBytes(totalBytes, bytes, 0, bytes.length);
+          totalBytes += bytes.length;
+          read += readSize;
+        }
+        writer.writeVarChar(0, totalBytes, reuse);
+      }
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void close() {
+    if (reuse != null) {
+      reuse.close();
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
new file mode 100644
index 0000000..2c598c8
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/CompositeJdbcConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Composite consumer which hold all consumers.
+ * It manages the consume and cleanup process.
+ */
+public class CompositeJdbcConsumer implements JdbcConsumer {
+
+  private final JdbcConsumer[] consumers;
+  private int readRowCount;
+
+  public int getReadRowCount() {
+    return readRowCount;
+  }
+
+  public CompositeJdbcConsumer(JdbcConsumer[] consumers) {
+    this.consumers = consumers;
+  }
+
+  @Override
+  public void consume(ResultSet rs) throws SQLException, IOException {
+    while (rs.next()) {
+      for (JdbcConsumer consumer : consumers) {
+        consumer.consume(rs);
+      }
+      readRowCount++;
+    }
+  }
+
+  @Override
+  public void close() {
+    // clean up
+    for (JdbcConsumer consumer : consumers) {
+      consumer.close();
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java
new file mode 100644
index 0000000..6be00c0
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DateConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Calendar;
+import java.util.Date;
+
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.complex.impl.DateMilliWriterImpl;
+import org.apache.arrow.vector.complex.writer.DateMilliWriter;
+
+/**
+ * Consumer which consume date type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.DateMilliVector}.
+ */
+public class DateConsumer implements JdbcConsumer {
+
+  private final DateMilliWriter writer;
+  private final int columnIndexInResultSet;
+  private final Calendar calendar;
+
+  /**
+   * Instantiate a DateConsumer.
+   */
+  public DateConsumer(DateMilliVector vector, int index) {
+    this (vector, index, null);
+  }
+
+  /**
+   * Instantiate a DateConsumer.
+   */
+  public DateConsumer(DateMilliVector vector, int index, Calendar calendar) {
+    this.writer = new DateMilliWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+    this.calendar = calendar;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    Date date = calendar == null ? resultSet.getDate(columnIndexInResultSet) :
+        resultSet.getDate(columnIndexInResultSet, calendar);
+    if (!resultSet.wasNull()) {
+      writer.writeDateMilli(date.getTime());
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java
new file mode 100644
index 0000000..862471c
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DecimalConsumer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.complex.impl.DecimalWriterImpl;
+import org.apache.arrow.vector.complex.writer.DecimalWriter;
+
+/**
+ * Consumer which consume decimal type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.DecimalVector}.
+ */
+public class DecimalConsumer implements JdbcConsumer {
+
+  private final DecimalWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a DecimalConsumer.
+   */
+  public DecimalConsumer(DecimalVector vector, int index) {
+    this.writer = new DecimalWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    BigDecimal value = resultSet.getBigDecimal(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeDecimal(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java
new file mode 100644
index 0000000..b1dcb9e
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/DoubleConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float8Writer;
+
+/**
+ * Consumer which consume double type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.Float8Vector}.
+ */
+public class DoubleConsumer implements JdbcConsumer {
+
+  private final Float8Writer writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a DoubleConsumer.
+   */
+  public DoubleConsumer(Float8Vector vector, int index) {
+    this.writer = new Float8WriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    double value = resultSet.getDouble(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeFloat8(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java
new file mode 100644
index 0000000..c7922b1
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/FloatConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
+import org.apache.arrow.vector.complex.writer.Float4Writer;
+
+/**
+ * Consumer which consume float type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.Float4Vector}.
+ */
+public class FloatConsumer implements JdbcConsumer {
+
+  private final Float4Writer writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a FloatConsumer.
+   */
+  public FloatConsumer(Float4Vector vector, int index) {
+    this.writer = new Float4WriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    float value = resultSet.getFloat(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeFloat4(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java
new file mode 100644
index 0000000..8a60e59
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/IntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.complex.impl.IntWriterImpl;
+import org.apache.arrow.vector.complex.writer.IntWriter;
+
+/**
+ * Consumer which consume int type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.IntVector}.
+ */
+public class IntConsumer implements JdbcConsumer {
+
+  private final IntWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a IntConsumer.
+   */
+  public IntConsumer(IntVector vector, int index) {
+    this.writer = new IntWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    int value = resultSet.getInt(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeInt(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java
new file mode 100644
index 0000000..5030503
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/JdbcConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * An abstraction that is used to consume values from {@link ResultSet}.
+ */
+public interface JdbcConsumer extends AutoCloseable {
+
+  /**
+   * Consume a specific type value from {@link ResultSet} and write it to vector.
+   */
+  void consume(ResultSet resultSet) throws SQLException, IOException;
+
+  /**
+   * Close this consumer, do some clean work such as clear reuse ArrowBuf.
+   */
+  default void close() {}
+
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java
new file mode 100644
index 0000000..1c76f86
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/SmallIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.SmallIntWriter;
+
+/**
+ * Consumer which consume smallInt type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.SmallIntVector}.
+ */
+public class SmallIntConsumer implements JdbcConsumer {
+
+  private final SmallIntWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a SmallIntConsumer.
+   */
+  public SmallIntConsumer(SmallIntVector vector, int index) {
+    this.writer = new SmallIntWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    short value = resultSet.getShort(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeSmallInt(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java
new file mode 100644
index 0000000..46d3f32
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimeConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.complex.impl.TimeMilliWriterImpl;
+import org.apache.arrow.vector.complex.writer.TimeMilliWriter;
+
+/**
+ * Consumer which consume time type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.TimeMilliVector}.
+ */
+public class TimeConsumer implements JdbcConsumer {
+
+  private final TimeMilliWriter writer;
+  private final int columnIndexInResultSet;
+  private final Calendar calendar;
+
+  /**
+   * Instantiate a TimeConsumer.
+   */
+  public TimeConsumer(TimeMilliVector vector, int index) {
+    this(vector, index, null);
+  }
+
+  /**
+   * Instantiate a TimeConsumer.
+   */
+  public TimeConsumer(TimeMilliVector vector, int index, Calendar calendar) {
+    this.writer = new TimeMilliWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+    this.calendar = calendar;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    Time time = calendar == null ? resultSet.getTime(columnIndexInResultSet) :
+        resultSet.getTime(columnIndexInResultSet, calendar);
+    if (!resultSet.wasNull()) {
+      writer.writeTimeMilli((int) time.getTime());
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java
new file mode 100644
index 0000000..aef10e3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TimestampConsumer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.complex.impl.TimeStampMilliTZWriterImpl;
+import org.apache.arrow.vector.complex.writer.TimeStampMilliTZWriter;
+
+/**
+ * Consumer which consume timestamp type values from {@link ResultSet}.
+ * Write the data to {@link TimeStampMilliTZVector}.
+ */
+public class TimestampConsumer implements JdbcConsumer {
+
+  private final TimeStampMilliTZWriter writer;
+  private final int columnIndexInResultSet;
+  private final Calendar calendar;
+
+  /**
+   * Instantiate a TimestampConsumer.
+   */
+  public TimestampConsumer(TimeStampMilliTZVector vector, int index) {
+    this(vector, index, null);
+  }
+
+  /**
+   * Instantiate a TimestampConsumer.
+   */
+  public TimestampConsumer(TimeStampMilliTZVector vector, int index, Calendar calendar) {
+    this.writer = new TimeStampMilliTZWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+    this.calendar = calendar;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    Timestamp timestamp = calendar == null ? resultSet.getTimestamp(columnIndexInResultSet) :
+        resultSet.getTimestamp(columnIndexInResultSet, calendar);
+    if (!resultSet.wasNull()) {
+      writer.writeTimeStampMilliTZ(timestamp.getTime());
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java
new file mode 100644
index 0000000..ca310c4
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/TinyIntConsumer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.complex.impl.TinyIntWriterImpl;
+import org.apache.arrow.vector.complex.writer.TinyIntWriter;
+
+/**
+ * Consumer which consume tinyInt type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.TinyIntVector}.
+ */
+public class TinyIntConsumer implements JdbcConsumer {
+
+  private final TinyIntWriter writer;
+  private final int columnIndexInResultSet;
+
+  /**
+   * Instantiate a TinyIntConsumer.
+   */
+  public TinyIntConsumer(TinyIntVector vector, int index) {
+    this.writer = new TinyIntWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    byte value = resultSet.getByte(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      writer.writeTinyInt(value);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java
new file mode 100644
index 0000000..9eb6eff
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/VarCharConsumer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.consumer;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
+import org.apache.arrow.vector.complex.writer.VarCharWriter;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Consumer which consume varchar type values from {@link ResultSet}.
+ * Write the data to {@link org.apache.arrow.vector.VarCharVector}.
+ */
+public class VarCharConsumer implements JdbcConsumer {
+
+  private final VarCharWriter writer;
+  private final int columnIndexInResultSet;
+  private BufferAllocator allocator;
+
+  private ArrowBuf reuse;
+
+  /**
+   * Instantiate a VarCharConsumer.
+   */
+  public VarCharConsumer(VarCharVector vector, int index) {
+    this.writer = new VarCharWriterImpl(vector);
+    this.columnIndexInResultSet = index;
+    this.allocator = vector.getAllocator();
+  }
+
+  @Override
+  public void consume(ResultSet resultSet) throws SQLException {
+    String value = resultSet.getString(columnIndexInResultSet);
+    if (!resultSet.wasNull()) {
+      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+      if (reuse == null) {
+        reuse = allocator.buffer(bytes.length);
+      }
+      if (bytes.length > reuse.capacity()) {
+        reuse.close();
+        reuse = allocator.buffer(bytes.length);
+      }
+      reuse.setBytes(0, bytes, 0, bytes.length);
+      writer.writeVarChar(0, bytes.length, reuse);
+    }
+    writer.setPosition(writer.getPosition() + 1);
+  }
+
+  @Override
+  public void close() {
+    if (reuse != null) {
+      reuse.close();
+    }
+  }
+}