You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/26 16:59:51 UTC

[arrow] branch master updated: ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters (#13589)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a28377e4 ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters  (#13589)
a5a28377e4 is described below

commit a5a28377e43b6435eefa5cc684d80d5fb26f387a
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jul 26 12:59:44 2022 -0400

    ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters  (#13589)
    
    This extends the arrow-jdbc adapter to also allow taking Arrow data and using it to bind JDBC PreparedStatement parameters, allowing you to "round trip" data to a certain extent. This was factored out of arrow-adbc since it's not strictly tied to ADBC.
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 docs/source/java/jdbc.rst                          | 112 ++++-
 java/adapter/jdbc/pom.xml                          |   6 +
 .../arrow/adapter/jdbc/JdbcParameterBinder.java    | 152 ++++++
 .../adapter/jdbc/binder/BaseColumnBinder.java      |  44 ++
 .../arrow/adapter/jdbc/binder/BigIntBinder.java    |  41 ++
 .../arrow/adapter/jdbc/binder/BitBinder.java       |  45 ++
 .../arrow/adapter/jdbc/binder/ColumnBinder.java    |  71 +++
 .../jdbc/binder/ColumnBinderArrowTypeVisitor.java  | 247 ++++++++++
 .../arrow/adapter/jdbc/binder/DateDayBinder.java   |  59 +++
 .../arrow/adapter/jdbc/binder/DateMilliBinder.java |  58 +++
 .../adapter/jdbc/binder/Decimal128Binder.java      |  46 ++
 .../adapter/jdbc/binder/Decimal256Binder.java      |  46 ++
 .../adapter/jdbc/binder/FixedSizeBinaryBinder.java |  45 ++
 .../arrow/adapter/jdbc/binder/Float4Binder.java    |  43 ++
 .../arrow/adapter/jdbc/binder/Float8Binder.java    |  43 ++
 .../arrow/adapter/jdbc/binder/IntBinder.java       |  41 ++
 .../adapter/jdbc/binder/NullableColumnBinder.java  |  53 ++
 .../arrow/adapter/jdbc/binder/SmallIntBinder.java  |  41 ++
 .../arrow/adapter/jdbc/binder/Time32Binder.java    |  65 +++
 .../arrow/adapter/jdbc/binder/Time64Binder.java    |  64 +++
 .../arrow/adapter/jdbc/binder/TimeStampBinder.java |  98 ++++
 .../arrow/adapter/jdbc/binder/TinyIntBinder.java   |  41 ++
 .../arrow/adapter/jdbc/binder/VarBinaryBinder.java |  62 +++
 .../arrow/adapter/jdbc/binder/VarCharBinder.java   |  63 +++
 .../arrow/adapter/jdbc/binder/package-info.java    |  22 +
 .../adapter/jdbc/JdbcParameterBinderTest.java      | 486 +++++++++++++++++++
 .../arrow/adapter/jdbc/MockPreparedStatement.java  | 536 +++++++++++++++++++++
 27 files changed, 2626 insertions(+), 4 deletions(-)

diff --git a/docs/source/java/jdbc.rst b/docs/source/java/jdbc.rst
index da63351601..c0477cb06d 100644
--- a/docs/source/java/jdbc.rst
+++ b/docs/source/java/jdbc.rst
@@ -84,8 +84,10 @@ inconsistent scale. A RoundingMode can be set to handle these cases:
      }
    }
 
-Currently, it is not possible to define a custom type conversion for a
-supported or unsupported type.
+The mapping from JDBC type to Arrow type can be overridden via the
+``JdbcToArrowConfig``, but it is not possible to customize the
+conversion from JDBC value to Arrow value itself, nor is it possible
+to define a conversion for an unsupported type.
 
 Type Mapping
 ------------
@@ -120,7 +122,7 @@ The JDBC to Arrow type mapping can be obtained at runtime from
 +--------------------+--------------------+-------+
 | DOUBLE             | Double             |       |
 +--------------------+--------------------+-------+
-| FLOAT              | Float              |       |
+| FLOAT              | Float32            |       |
 +--------------------+--------------------+-------+
 | INTEGER            | Int32              |       |
 +--------------------+--------------------+-------+
@@ -138,7 +140,7 @@ The JDBC to Arrow type mapping can be obtained at runtime from
 +--------------------+--------------------+-------+
 | NVARCHAR           | Utf8               |       |
 +--------------------+--------------------+-------+
-| REAL               | Float              |       |
+| REAL               | Float32            |       |
 +--------------------+--------------------+-------+
 | SMALLINT           | Int16              |       |
 +--------------------+--------------------+-------+
@@ -172,3 +174,105 @@ The JDBC to Arrow type mapping can be obtained at runtime from
 .. _setArraySubTypeByColumnIndexMap: https://arrow.apache.org/docs/java/reference/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.html#setArraySubTypeByColumnIndexMap-java.util.Map-
 .. _setArraySubTypeByColumnNameMap: https://arrow.apache.org/docs/java/reference/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.html#setArraySubTypeByColumnNameMap-java.util.Map-
 .. _ARROW-17006: https://issues.apache.org/jira/browse/ARROW-17006
+
+VectorSchemaRoot to PreparedStatement Parameter Conversion
+==========================================================
+
+The adapter can bind rows of Arrow data from a VectorSchemaRoot to
+parameters of a JDBC PreparedStatement.  This can be accessed via the
+JdbcParameterBinder class.  Each call to next() will bind parameters
+from the next row of data, and then the application can execute the
+statement, call addBatch(), etc. as desired.  Null values will lead to
+a setNull call with an appropriate JDBC type code (listed below).
+
+.. code-block:: java
+
+   final JdbcParameterBinder binder =
+       JdbcParameterBinder.builder(statement, root).bindAll().build();
+   while (binder.next()) {
+       statement.executeUpdate();
+   }
+   // Use a VectorLoader to update the root
+   binder.reset();
+   while (binder.next()) {
+       statement.executeUpdate();
+   }
+
+The mapping of vectors to parameters, the JDBC type code used by the
+converters, and the type conversions themselves can all be customized:
+
+.. code-block:: java
+
+   final JdbcParameterBinder binder =
+       JdbcParameterBinder.builder(statement, root)
+           .bind(/*parameterIndex*/2, /*columnIndex*/0)
+           .bind(/*parameterIndex*/1, customColumnBinderInstance)
+           .build();
+
+Type Mapping
+------------
+
+The Arrow to JDBC type mapping can be obtained at runtime via
+a method on ColumnBinder.
+
++----------------------------+----------------------------+-------+
+| Arrow Type                 | JDBC Type                  | Notes |
++============================+============================+=======+
+| Binary                     | VARBINARY (setBytes)       |       |
++----------------------------+----------------------------+-------+
+| Bool                       | BOOLEAN (setBoolean)       |       |
++----------------------------+----------------------------+-------+
+| Date32                     | DATE (setDate)             |       |
++----------------------------+----------------------------+-------+
+| Date64                     | DATE (setDate)             |       |
++----------------------------+----------------------------+-------+
+| Decimal128                 | DECIMAL (setBigDecimal)    |       |
++----------------------------+----------------------------+-------+
+| Decimal256                 | DECIMAL (setBigDecimal)    |       |
++----------------------------+----------------------------+-------+
+| FixedSizeBinary            | BINARY (setBytes)          |       |
++----------------------------+----------------------------+-------+
+| Float32                    | REAL (setFloat)            |       |
++----------------------------+----------------------------+-------+
+| Int8                       | TINYINT (setByte)          |       |
++----------------------------+----------------------------+-------+
+| Int16                      | SMALLINT (setShort)        |       |
++----------------------------+----------------------------+-------+
+| Int32                      | INTEGER (setInt)           |       |
++----------------------------+----------------------------+-------+
+| Int64                      | BIGINT (setLong)           |       |
++----------------------------+----------------------------+-------+
+| LargeBinary                | LONGVARBINARY (setBytes)   |       |
++----------------------------+----------------------------+-------+
+| LargeUtf8                  | LONGVARCHAR (setString)    | \(1)  |
++----------------------------+----------------------------+-------+
+| Time[s]                    | TIME (setTime)             |       |
++----------------------------+----------------------------+-------+
+| Time[ms]                   | TIME (setTime)             |       |
++----------------------------+----------------------------+-------+
+| Time[us]                   | TIME (setTime)             |       |
++----------------------------+----------------------------+-------+
+| Time[ns]                   | TIME (setTime)             |       |
++----------------------------+----------------------------+-------+
+| Timestamp[s]               | TIMESTAMP (setTimestamp)   | \(2)  |
++----------------------------+----------------------------+-------+
+| Timestamp[ms]              | TIMESTAMP (setTimestamp)   | \(2)  |
++----------------------------+----------------------------+-------+
+| Timestamp[us]              | TIMESTAMP (setTimestamp)   | \(2)  |
++----------------------------+----------------------------+-------+
+| Timestamp[ns]              | TIMESTAMP (setTimestamp)   | \(2)  |
++----------------------------+----------------------------+-------+
+| Utf8                       | VARCHAR (setString)        |       |
++----------------------------+----------------------------+-------+
+
+* \(1) Strings longer than Integer.MAX_VALUE bytes (the maximum length
+  of a Java ``byte[]``) will cause a runtime exception.
+* \(2) If the timestamp has a timezone, the JDBC type defaults to
+  TIMESTAMP_WITH_TIMEZONE.  If the timestamp has no timezone,
+  technically there is not a correct conversion from Arrow value to
+  JDBC value, because a JDBC Timestamp is in UTC, and we have no
+  timezone information.  In this case, the default binder will call
+  `setTimestamp(int, Timestamp)
+  <https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html#setTimestamp(int,java.sql.Timestamp)>`_,
+  which will lead to the driver using the "default timezone" (that of
+  the Java VM).
diff --git a/java/adapter/jdbc/pom.xml b/java/adapter/jdbc/pom.xml
index 393ed0dddc..8e5829a284 100644
--- a/java/adapter/jdbc/pom.xml
+++ b/java/adapter/jdbc/pom.xml
@@ -82,6 +82,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-common</artifactId>
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java
new file mode 100644
index 0000000000..2dfc0658cb
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.adapter.jdbc.binder.ColumnBinder;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/**
+ * A binder binds JDBC prepared statement parameters to rows of Arrow data from a VectorSchemaRoot.
+ *
+ * Each row of the VectorSchemaRoot will be bound to the configured parameters of the PreparedStatement.
+ * One row of data is bound at a time.
+ */
+public class JdbcParameterBinder {
+  private final PreparedStatement statement;
+  private final VectorSchemaRoot root;
+  private final ColumnBinder[] binders;
+  private final int[] parameterIndices;
+  private int nextRowIndex;
+
+  /**
+   * Create a new parameter binder.
+   *
+   * @param statement The statement to bind parameters to.
+   * @param root The VectorSchemaRoot to pull data from.
+   * @param binders Column binders to translate from Arrow data to JDBC parameters, one per parameter.
+   * @param parameterIndices For each binder in <tt>binders</tt>, the index of the parameter to bind to.
+   */
+  private JdbcParameterBinder(
+      final PreparedStatement statement,
+      final VectorSchemaRoot root,
+      final ColumnBinder[] binders,
+      int[] parameterIndices) {
+    Preconditions.checkArgument(
+        binders.length == parameterIndices.length,
+        "Number of column binders (%s) must equal number of parameter indices (%s)",
+        binders.length, parameterIndices.length);
+    this.statement = statement;
+    this.root = root;
+    this.binders = binders;
+    this.parameterIndices = parameterIndices;
+    this.nextRowIndex = 0;
+  }
+
+  /**
+   * Initialize a binder with a builder.
+   *
+   * @param statement The statement to bind to. The binder does not maintain ownership of the statement.
+   * @param root The {@link VectorSchemaRoot} to pull data from. The binder does not maintain ownership
+   *             of the vector schema root.
+   */
+  public static Builder builder(final PreparedStatement statement, final VectorSchemaRoot root) {
+    return new Builder(statement, root);
+  }
+
+  /** Reset the binder (so the root can be updated with new data). */
+  public void reset() {
+    nextRowIndex = 0;
+  }
+
+  /**
+   * Bind the next row of data to the parameters of the statement.
+   *
+   * After this, the application should call the desired method on the prepared statement,
+   * such as {@link PreparedStatement#executeUpdate()}, or {@link PreparedStatement#addBatch()}.
+   *
+   * @return true if a row was bound, false if rows were exhausted
+   */
+  public boolean next() throws SQLException {
+    if (nextRowIndex >= root.getRowCount()) {
+      return false;
+    }
+    for (int i = 0; i < parameterIndices.length; i++) {
+      final int parameterIndex = parameterIndices[i];
+      binders[i].bind(statement, parameterIndex, nextRowIndex);
+    }
+    nextRowIndex++;
+    return true;
+  }
+
+  /**
+   * A builder for a {@link JdbcParameterBinder}.
+   */
+  public static class Builder {
+    private final PreparedStatement statement;
+    private final VectorSchemaRoot root;
+    private final Map<Integer, ColumnBinder> bindings;
+
+    Builder(PreparedStatement statement, VectorSchemaRoot root) {
+      this.statement = statement;
+      this.root = root;
+      this.bindings = new HashMap<>();
+    }
+
+    /** Bind each column to the corresponding parameter in order. */
+    public Builder bindAll() {
+      for (int i = 0; i < root.getFieldVectors().size(); i++) {
+        bind(/*parameterIndex=*/ i + 1, /*columnIndex=*/ i);
+      }
+      return this;
+    }
+
+    /** Bind the given parameter to the given column using the default binder. */
+    public Builder bind(int parameterIndex, int columnIndex) {
+      return bind(
+          parameterIndex,
+          ColumnBinder.forVector(root.getVector(columnIndex)));
+    }
+
+    /** Bind the given parameter using the given binder. */
+    public Builder bind(int parameterIndex, ColumnBinder binder) {
+      Preconditions.checkArgument(
+          parameterIndex > 0, "parameterIndex %d must be positive", parameterIndex);
+      bindings.put(parameterIndex, binder);
+      return this;
+    }
+
+    /** Build the binder. */
+    public JdbcParameterBinder build() {
+      ColumnBinder[] binders = new ColumnBinder[bindings.size()];
+      int[] parameterIndices = new int[bindings.size()];
+      int index = 0;
+      for (Map.Entry<Integer, ColumnBinder> entry : bindings.entrySet()) {
+        binders[index] = entry.getValue();
+        parameterIndices[index] = entry.getKey();
+        index++;
+      }
+      return new JdbcParameterBinder(statement, root, binders, parameterIndices);
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java
new file mode 100644
index 0000000000..f24f409072
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * Base class for ColumnBinder implementations.
+ * @param <V> The concrete FieldVector subtype.
+ */
+public abstract class BaseColumnBinder<V extends FieldVector> implements ColumnBinder {
+  protected final V vector;
+  protected final int jdbcType;
+
+  public BaseColumnBinder(V vector, int jdbcType) {
+    this.vector = vector;
+    this.jdbcType = jdbcType;
+  }
+
+  @Override
+  public int getJdbcType() {
+    return jdbcType;
+  }
+
+  @Override
+  public V getVector() {
+    return vector;
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java
new file mode 100644
index 0000000000..fde4642ef9
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BigIntVector;
+
+/** A column binder for 8-bit integers. */
+public class BigIntBinder extends BaseColumnBinder<BigIntVector> {
+  public BigIntBinder(BigIntVector vector) {
+    this(vector, Types.BIGINT);
+  }
+
+  public BigIntBinder(BigIntVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final long value = vector.getDataBuffer().getLong((long) rowIndex * BigIntVector.TYPE_WIDTH);
+    statement.setLong(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java
new file mode 100644
index 0000000000..adae513e99
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BitVector;
+
+/** A column binder for booleans. */
+public class BitBinder extends BaseColumnBinder<BitVector> {
+  public BitBinder(BitVector vector) {
+    this(vector, Types.BOOLEAN);
+  }
+
+  public BitBinder(BitVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    // See BitVector#getBit
+    final int byteIndex = rowIndex >> 3;
+    final byte b = vector.getDataBuffer().getByte(byteIndex);
+    final int bitIndex = rowIndex & 7;
+    final int value = (b >> bitIndex) & 0x01;
+    statement.setBoolean(parameterIndex, value != 0);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java
new file mode 100644
index 0000000000..c2b1259e14
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A helper to bind values from a wrapped Arrow vector to a JDBC PreparedStatement.
+ */
+public interface ColumnBinder {
+  /**
+   * Bind the given row to the given parameter.
+   *
+   * @param statement The statement to bind to.
+   * @param parameterIndex The parameter to bind to (1-indexed)
+   * @param rowIndex The row to bind values from (0-indexed)
+   * @throws SQLException if an error occurs
+   */
+  void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException;
+
+  /**
+   * Get the JDBC type code used by this binder.
+   *
+   * @return A type code from {@link java.sql.Types}.
+   */
+  int getJdbcType();
+
+  /**
+   * Get the vector used by this binder.
+   */
+  FieldVector getVector();
+
+  /**
+   * Create a column binder for a vector, using the default JDBC type code for null values.
+   */
+  static ColumnBinder forVector(FieldVector vector) {
+    return forVector(vector, /*jdbcType*/ null);
+  }
+
+  /**
+   * Create a column binder for a vector, overriding the JDBC type code used for null values.
+   *
+   * @param vector The vector that the column binder will wrap.
+   * @param jdbcType The JDBC type code to use (or null to use the default).
+   */
+  static ColumnBinder forVector(FieldVector vector, Integer jdbcType) {
+    final ColumnBinder binder = vector.getField().getType().accept(new ColumnBinderArrowTypeVisitor(vector, jdbcType));
+    if (vector.getField().isNullable()) {
+      return new NullableColumnBinder(binder);
+    }
+    return binder;
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
new file mode 100644
index 0000000000..6496ca5a31
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Types;
+import java.time.ZoneId;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+/**
+ * Visitor to create the base ColumnBinder for a vector.
+ * <p>
+ * To handle null values, wrap the returned binder in a {@link NullableColumnBinder}.
+ */
+public class ColumnBinderArrowTypeVisitor implements ArrowType.ArrowTypeVisitor<ColumnBinder> {
+  private final FieldVector vector;
+  private final Integer jdbcType;
+
+  /**
+   * Create a binder using a custom JDBC type code.
+   *
+   * @param vector The vector that the binder will wrap.
+   * @param jdbcType The JDBC type code (or null to use the default).
+   */
+  public ColumnBinderArrowTypeVisitor(FieldVector vector, Integer jdbcType) {
+    this.vector = vector;
+    this.jdbcType = jdbcType;
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Null type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Struct type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.List type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.LargeList type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.FixedSizeList type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Union type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Map type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Int type) {
+    if (!type.getIsSigned()) {
+      throw new UnsupportedOperationException(
+          "No column binder implemented for unsigned type " + type);
+    }
+    switch (type.getBitWidth()) {
+      case 8:
+        return jdbcType == null ? new TinyIntBinder((TinyIntVector) vector) :
+            new TinyIntBinder((TinyIntVector) vector, jdbcType);
+      case 16:
+        return jdbcType == null ? new SmallIntBinder((SmallIntVector) vector) :
+            new SmallIntBinder((SmallIntVector) vector, jdbcType);
+      case 32:
+        return jdbcType == null ? new IntBinder((IntVector) vector) :
+            new IntBinder((IntVector) vector, jdbcType);
+      case 64:
+        return jdbcType == null ? new BigIntBinder((BigIntVector) vector) :
+            new BigIntBinder((BigIntVector) vector, jdbcType);
+      default:
+        throw new UnsupportedOperationException("No column binder implemented for type " + type);
+    }
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.FloatingPoint type) {
+    switch (type.getPrecision()) {
+      case SINGLE:
+        return jdbcType == null ? new Float4Binder((Float4Vector) vector) :
+            new Float4Binder((Float4Vector) vector, jdbcType);
+      case DOUBLE:
+        return jdbcType == null ? new Float8Binder((Float8Vector) vector) :
+            new Float8Binder((Float8Vector) vector, jdbcType);
+      default:
+        throw new UnsupportedOperationException("No column binder implemented for type " + type);
+    }
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Utf8 type) {
+    VarCharVector varChar = (VarCharVector) vector;
+    return jdbcType == null ? new VarCharBinder<>(varChar, Types.VARCHAR) :
+        new VarCharBinder<>(varChar, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.LargeUtf8 type) {
+    LargeVarCharVector varChar = (LargeVarCharVector) vector;
+    return jdbcType == null ? new VarCharBinder<>(varChar, Types.LONGVARCHAR) :
+        new VarCharBinder<>(varChar, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Binary type) {
+    VarBinaryVector varBinary = (VarBinaryVector) vector;
+    return jdbcType == null ? new VarBinaryBinder<>(varBinary, Types.VARBINARY) :
+        new VarBinaryBinder<>(varBinary, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.LargeBinary type) {
+    LargeVarBinaryVector varBinary = (LargeVarBinaryVector) vector;
+    return jdbcType == null ? new VarBinaryBinder<>(varBinary, Types.LONGVARBINARY) :
+        new VarBinaryBinder<>(varBinary, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.FixedSizeBinary type) {
+    FixedSizeBinaryVector binary = (FixedSizeBinaryVector) vector;
+    return jdbcType == null ? new FixedSizeBinaryBinder(binary, Types.BINARY) :
+        new FixedSizeBinaryBinder(binary, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Bool type) {
+    return jdbcType == null ? new BitBinder((BitVector) vector) : new BitBinder((BitVector) vector, jdbcType);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Decimal type) {
+    if (type.getBitWidth() == 128) {
+      DecimalVector decimalVector = (DecimalVector) vector;
+      return jdbcType == null ? new Decimal128Binder(decimalVector) : new Decimal128Binder(decimalVector, jdbcType);
+    } else if (type.getBitWidth() == 256) {
+      Decimal256Vector decimalVector = (Decimal256Vector) vector;
+      return jdbcType == null ? new Decimal256Binder(decimalVector) : new Decimal256Binder(decimalVector, jdbcType);
+    }
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Date type) {
+    switch (type.getUnit()) {
+      case DAY:
+        return jdbcType == null ? new DateDayBinder((DateDayVector) vector) :
+            new DateDayBinder((DateDayVector) vector, /*calendar*/null, jdbcType);
+      case MILLISECOND:
+        return jdbcType == null ? new DateMilliBinder((DateMilliVector) vector) :
+            new DateMilliBinder((DateMilliVector) vector, /*calendar*/null, jdbcType);
+      default:
+        throw new UnsupportedOperationException("No column binder implemented for type " + type);
+    }
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Time type) {
+    switch (type.getUnit()) {
+      case SECOND:
+        return jdbcType == null ? new Time32Binder((TimeSecVector) vector) :
+            new Time32Binder((TimeSecVector) vector, jdbcType);
+      case MILLISECOND:
+        return jdbcType == null ? new Time32Binder((TimeMilliVector) vector) :
+            new Time32Binder((TimeMilliVector) vector, jdbcType);
+      case MICROSECOND:
+        return jdbcType == null ? new Time64Binder((TimeMicroVector) vector) :
+            new Time64Binder((TimeMicroVector) vector, jdbcType);
+      case NANOSECOND:
+        return jdbcType == null ? new Time64Binder((TimeNanoVector) vector) :
+            new Time64Binder((TimeNanoVector) vector, jdbcType);
+      default:
+        throw new UnsupportedOperationException("No column binder implemented for type " + type);
+    }
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Timestamp type) {
+    Calendar calendar = null;
+    final String timezone = type.getTimezone();
+    if (timezone != null && !timezone.isEmpty()) {
+      calendar = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of(timezone)));
+    }
+    return new TimeStampBinder((TimeStampVector) vector, calendar);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Interval type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+
+  @Override
+  public ColumnBinder visit(ArrowType.Duration type) {
+    throw new UnsupportedOperationException("No column binder implemented for type " + type);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java
new file mode 100644
index 0000000000..bc16790c8f
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.DateDayVector;
+
+/**
+ * A column binder for 32-bit dates.
+ */
+public class DateDayBinder extends BaseColumnBinder<DateDayVector> {
+  private static final long MILLIS_PER_DAY = 86_400_000;
+  private final Calendar calendar;
+
+  public DateDayBinder(DateDayVector vector) {
+    this(vector, null, Types.DATE);
+  }
+
+  public DateDayBinder(DateDayVector vector, Calendar calendar) {
+    this(vector, calendar, Types.DATE);
+  }
+
+  public DateDayBinder(DateDayVector vector, Calendar calendar, int jdbcType) {
+    super(vector, jdbcType);
+    this.calendar = calendar;
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    // TODO: multiply with overflow
+    final long index = (long) rowIndex * DateDayVector.TYPE_WIDTH;
+    final Date value = new Date(vector.getDataBuffer().getInt(index) * MILLIS_PER_DAY);
+    if (calendar == null) {
+      statement.setDate(parameterIndex, value);
+    } else {
+      statement.setDate(parameterIndex, value, calendar);
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java
new file mode 100644
index 0000000000..5cb91b46ac
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.DateMilliVector;
+
+/**
+ * A column binder for 64-bit dates.
+ */
+public class DateMilliBinder extends BaseColumnBinder<DateMilliVector> {
+  private final Calendar calendar;
+
+  public DateMilliBinder(DateMilliVector vector) {
+    this(vector, null, Types.DATE);
+  }
+
+  public DateMilliBinder(DateMilliVector vector, Calendar calendar) {
+    this(vector, calendar, Types.DATE);
+  }
+
+
+  public DateMilliBinder(DateMilliVector vector, Calendar calendar, int jdbcType) {
+    super(vector, jdbcType);
+    this.calendar = calendar;
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final long index = (long) rowIndex * DateMilliVector.TYPE_WIDTH;
+    final Date value = new Date(vector.getDataBuffer().getLong(index));
+    if (calendar == null) {
+      statement.setDate(parameterIndex, value);
+    } else {
+      statement.setDate(parameterIndex, value, calendar);
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java
new file mode 100644
index 0000000000..9e9d0e4fdb
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+
+/**
+ * A binder for 128-bit decimals.
+ */
+public class Decimal128Binder extends BaseColumnBinder<DecimalVector> {
+  public Decimal128Binder(DecimalVector vector) {
+    this(vector, Types.DECIMAL);
+  }
+
+  public Decimal128Binder(DecimalVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final BigDecimal value = DecimalUtility.getBigDecimalFromArrowBuf(
+        vector.getDataBuffer(), rowIndex, vector.getScale(), DecimalVector.TYPE_WIDTH);
+    statement.setBigDecimal(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java
new file mode 100644
index 0000000000..bd29e083b4
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.util.DecimalUtility;
+
+/**
+ * A binder for 256-bit decimals.
+ */
+public class Decimal256Binder extends BaseColumnBinder<Decimal256Vector> {
+  public Decimal256Binder(Decimal256Vector vector) {
+    this(vector, Types.DECIMAL);
+  }
+
+  public Decimal256Binder(Decimal256Vector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final BigDecimal value = DecimalUtility.getBigDecimalFromArrowBuf(
+        vector.getDataBuffer(), rowIndex, vector.getScale(), Decimal256Vector.TYPE_WIDTH);
+    statement.setBigDecimal(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java
new file mode 100644
index 0000000000..7edc5e4532
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+
+/**
+ * A binder for fixed-width binary types.
+ */
+public class FixedSizeBinaryBinder extends BaseColumnBinder<FixedSizeBinaryVector> {
+  /**
+   * Create a binder for the given vector using the given JDBC type for null values.
+   *
+   * @param vector   The vector to draw values from.
+   * @param jdbcType The JDBC type code.
+   */
+  public FixedSizeBinaryBinder(FixedSizeBinaryVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    byte[] binaryData = new byte[vector.getByteWidth()];
+    vector.getDataBuffer().getBytes((long) rowIndex * binaryData.length, binaryData, 0, binaryData.length);
+    statement.setBytes(parameterIndex, binaryData);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java
new file mode 100644
index 0000000000..a471c1ebad
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Float4Vector;
+
+/**
+ * A binder for 32-bit floats.
+ */
+public class Float4Binder extends BaseColumnBinder<Float4Vector> {
+  public Float4Binder(Float4Vector vector) {
+    this(vector, Types.REAL);
+  }
+
+  public Float4Binder(Float4Vector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final float value = vector.getDataBuffer().getFloat((long) rowIndex * Float4Vector.TYPE_WIDTH);
+    statement.setFloat(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java
new file mode 100644
index 0000000000..4710c3b598
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Float8Vector;
+
+/**
+ * A binder for 64-bit floats.
+ */
+public class Float8Binder extends BaseColumnBinder<Float8Vector> {
+  public Float8Binder(Float8Vector vector) {
+    this(vector, Types.DOUBLE);
+  }
+
+  public Float8Binder(Float8Vector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final double value = vector.getDataBuffer().getDouble((long) rowIndex * Float8Vector.TYPE_WIDTH);
+    statement.setDouble(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java
new file mode 100644
index 0000000000..7d47f585a3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.IntVector;
+
+/** A column binder for 32-bit integers. */
+public class IntBinder extends BaseColumnBinder<IntVector> {
+  public IntBinder(IntVector vector) {
+    this(vector, Types.INTEGER);
+  }
+
+  public IntBinder(IntVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final int value = vector.getDataBuffer().getInt((long) rowIndex * IntVector.TYPE_WIDTH);
+    statement.setInt(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java
new file mode 100644
index 0000000000..123b587ca5
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A ColumnBinder that checks for nullability before deferring to a type-specific binder.
+ */
+public class NullableColumnBinder implements ColumnBinder {
+  private final ColumnBinder wrapped;
+
+  public NullableColumnBinder(ColumnBinder wrapped) {
+    this.wrapped = wrapped;
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    if (wrapped.getVector().isNull(rowIndex)) {
+      statement.setNull(parameterIndex, wrapped.getJdbcType());
+    } else {
+      wrapped.bind(statement, parameterIndex, rowIndex);
+    }
+  }
+
+  @Override
+  public int getJdbcType() {
+    return wrapped.getJdbcType();
+  }
+
+  @Override
+  public FieldVector getVector() {
+    return wrapped.getVector();
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java
new file mode 100644
index 0000000000..f9d744b9f5
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.SmallIntVector;
+
+/** A column binder for 8-bit integers. */
+public class SmallIntBinder extends BaseColumnBinder<SmallIntVector> {
+  public SmallIntBinder(SmallIntVector vector) {
+    this(vector, Types.SMALLINT);
+  }
+
+  public SmallIntBinder(SmallIntVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final short value = vector.getDataBuffer().getShort((short) rowIndex * SmallIntVector.TYPE_WIDTH);
+    statement.setShort(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java
new file mode 100644
index 0000000000..5dc7e3f513
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeSecVector;
+
+/**
+ * A binder for 32-bit time types.
+ */
+public class Time32Binder extends BaseColumnBinder<BaseFixedWidthVector> {
+  private static final long TYPE_WIDTH = 4;
+
+  private final long factor;
+
+  public Time32Binder(TimeSecVector vector) {
+    this(vector, Types.TIME);
+  }
+
+  public Time32Binder(TimeMilliVector vector) {
+    this(vector, Types.TIME);
+  }
+
+  public Time32Binder(TimeSecVector vector, int jdbcType) {
+    this(vector, /*factor*/1_000, jdbcType);
+  }
+
+  public Time32Binder(TimeMilliVector vector, int jdbcType) {
+    this(vector, /*factor*/1, jdbcType);
+  }
+
+  Time32Binder(BaseFixedWidthVector vector, long factor, int jdbcType) {
+    super(vector, jdbcType);
+    this.factor = factor;
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    // TODO: multiply with overflow
+    // TODO: take in a Calendar as well?
+    final Time value = new Time(vector.getDataBuffer().getInt(rowIndex * TYPE_WIDTH) * factor);
+    statement.setTime(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java
new file mode 100644
index 0000000000..8d62ae0eb3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeNanoVector;
+
+/**
+ * A binder for 64-bit time types.
+ */
+public class Time64Binder extends BaseColumnBinder<BaseFixedWidthVector> {
+  private static final long TYPE_WIDTH = 8;
+
+  private final long factor;
+
+  public Time64Binder(TimeMicroVector vector) {
+    this(vector, Types.TIME);
+  }
+
+  public Time64Binder(TimeNanoVector vector) {
+    this(vector, Types.TIME);
+  }
+
+  public Time64Binder(TimeMicroVector vector, int jdbcType) {
+    this(vector, /*factor*/1_000, jdbcType);
+  }
+
+  public Time64Binder(TimeNanoVector vector, int jdbcType) {
+    this(vector, /*factor*/1_000_000, jdbcType);
+  }
+
+  Time64Binder(BaseFixedWidthVector vector, long factor, int jdbcType) {
+    super(vector, jdbcType);
+    this.factor = factor;
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    // TODO: option to throw on truncation (vendor Guava IntMath#multiply)
+    final Time value = new Time(vector.getDataBuffer().getLong(rowIndex * TYPE_WIDTH) / factor);
+    statement.setTime(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java
new file mode 100644
index 0000000000..6677e59099
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+/** A column binder for timestamps. */
+public class TimeStampBinder extends BaseColumnBinder<TimeStampVector> {
+  private final Calendar calendar;
+  private final long unitsPerSecond;
+  private final long nanosPerUnit;
+
+  /**
+   * Create a binder for a timestamp vector using the default JDBC type code.
+   */
+  public TimeStampBinder(TimeStampVector vector, Calendar calendar) {
+    this(vector, calendar, isZoned(vector.getField().getType()) ? Types.TIMESTAMP_WITH_TIMEZONE : Types.TIMESTAMP);
+  }
+
+  /**
+   * Create a binder for a timestamp vector.
+   * @param vector The vector to pull values from.
+   * @param calendar Optionally, the calendar to pass to JDBC.
+   * @param jdbcType The JDBC type code to use for null values.
+   */
+  public TimeStampBinder(TimeStampVector vector, Calendar calendar, int jdbcType) {
+    super(vector, jdbcType);
+    this.calendar = calendar;
+
+    final ArrowType.Timestamp type = (ArrowType.Timestamp) vector.getField().getType();
+    switch (type.getUnit()) {
+      case SECOND:
+        this.unitsPerSecond = 1;
+        this.nanosPerUnit = 1_000_000_000;
+        break;
+      case MILLISECOND:
+        this.unitsPerSecond = 1_000;
+        this.nanosPerUnit = 1_000_000;
+        break;
+      case MICROSECOND:
+        this.unitsPerSecond = 1_000_000;
+        this.nanosPerUnit = 1_000;
+        break;
+      case NANOSECOND:
+        this.unitsPerSecond = 1_000_000_000;
+        this.nanosPerUnit = 1;
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid time unit in " + type);
+    }
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    // TODO: option to throw on truncation (vendor Guava IntMath#multiply) or overflow
+    final long rawValue = vector.getDataBuffer().getLong((long) rowIndex * TimeStampVector.TYPE_WIDTH);
+    final long seconds = rawValue / unitsPerSecond;
+    final int nanos = (int) ((rawValue - (seconds * unitsPerSecond)) * nanosPerUnit);
+    final Timestamp value = new Timestamp(seconds * 1_000);
+    value.setNanos(nanos);
+    if (calendar != null) {
+      // Timestamp == Date == UTC timestamp (confusingly). Arrow's timestamp with timezone is a UTC value with a
+      // zone offset, so we don't need to do any conversion.
+      statement.setTimestamp(parameterIndex, value, calendar);
+    } else {
+      // Arrow timestamp without timezone isn't strictly convertible to any timezone. So this is technically wrong,
+      // but there is no 'correct' interpretation here. The application should provide a calendar.
+      statement.setTimestamp(parameterIndex, value);
+    }
+  }
+
+  private static boolean isZoned(ArrowType type) {
+    final String timezone = ((ArrowType.Timestamp) type).getTimezone();
+    return timezone != null && !timezone.isEmpty();
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java
new file mode 100644
index 0000000000..f51d139be8
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.TinyIntVector;
+
+/** A column binder for 8-bit integers. */
+public class TinyIntBinder extends BaseColumnBinder<TinyIntVector> {
+  public TinyIntBinder(TinyIntVector vector) {
+    this(vector, Types.TINYINT);
+  }
+
+  public TinyIntBinder(TinyIntVector vector, int jdbcType) {
+    super(vector, jdbcType);
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    final byte value = vector.getDataBuffer().getByte((long) rowIndex * TinyIntVector.TYPE_WIDTH);
+    statement.setByte(parameterIndex, value);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java
new file mode 100644
index 0000000000..a94cff6a00
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.util.ArrowBufPointer;
+import org.apache.arrow.vector.ElementAddressableVector;
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A binder for variable-width binary types.
+ *
+ * @param <T> The binary vector.
+ */
+public class VarBinaryBinder<T extends FieldVector & ElementAddressableVector> extends BaseColumnBinder<T> {
+  private final ArrowBufPointer element;
+
+  /**
+   * Create a binder for the given vector using the given JDBC type for null values.
+   *
+   * @param vector   The vector to draw values from.
+   * @param jdbcType The JDBC type code.
+   */
+  public VarBinaryBinder(T vector, int jdbcType) {
+    super(vector, jdbcType);
+    this.element = new ArrowBufPointer();
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    vector.getDataPointer(rowIndex, element);
+    if (element.getBuf() == null) {
+      statement.setNull(parameterIndex, jdbcType);
+      return;
+    }
+    if (element.getLength() > (long) Integer.MAX_VALUE) {
+      final String message = String.format("Length of value at index %d (%d) exceeds Integer.MAX_VALUE",
+          rowIndex, element.getLength());
+      throw new RuntimeException(message);
+    }
+    byte[] binaryData = new byte[(int) element.getLength()];
+    element.getBuf().getBytes(element.getOffset(), binaryData);
+    statement.setBytes(parameterIndex, binaryData);
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java
new file mode 100644
index 0000000000..73bd559814
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.util.ArrowBufPointer;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VariableWidthVector;
+
+/**
+ * A binder for variable-width string types.
+ *
+ * @param <T> The text vector.
+ */
+public class VarCharBinder<T extends FieldVector & VariableWidthVector> extends BaseColumnBinder<T> {
+  private final ArrowBufPointer element;
+
+  /**
+   * Create a binder for the given vector using the given JDBC type for null values.
+   *
+   * @param vector   The vector to draw values from.
+   * @param jdbcType The JDBC type code.
+   */
+  public VarCharBinder(T vector, int jdbcType) {
+    super(vector, jdbcType);
+    this.element = new ArrowBufPointer();
+  }
+
+  @Override
+  public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+    vector.getDataPointer(rowIndex, element);
+    if (element.getBuf() == null) {
+      statement.setNull(parameterIndex, jdbcType);
+      return;
+    }
+    if (element.getLength() > (long) Integer.MAX_VALUE) {
+      final String message = String.format("Length of value at index %d (%d) exceeds Integer.MAX_VALUE",
+          rowIndex, element.getLength());
+      throw new RuntimeException(message);
+    }
+    byte[] utf8Bytes = new byte[(int) element.getLength()];
+    element.getBuf().getBytes(element.getOffset(), utf8Bytes);
+    statement.setString(parameterIndex, new String(utf8Bytes, StandardCharsets.UTF_8));
+  }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java
new file mode 100644
index 0000000000..4f8936e0c2
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities to bind Arrow data as JDBC prepared statement parameters.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
new file mode 100644
index 0000000000..c8c043f2f0
--- /dev/null
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.apache.arrow.adapter.jdbc.binder.ColumnBinder;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BaseLargeVariableWidthVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecTZVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class JdbcParameterBinderTest {
+  private static final long MILLIS_PER_DAY = 86_400_000;
+  BufferAllocator allocator;
+
+  @BeforeEach
+  void beforeEach() {
+    allocator = new RootAllocator();
+  }
+
+  @AfterEach
+  void afterEach() {
+    allocator.close();
+  }
+
+  @Test
+  void bindOrder() throws SQLException {
+    final Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("ints0", new ArrowType.Int(32, true)),
+                Field.nullable("ints1", new ArrowType.Int(32, true)),
+                Field.nullable("ints2", new ArrowType.Int(32, true))));
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root)
+              .bind(/*paramIndex=*/ 1, /*colIndex=*/ 2)
+              .bind(/*paramIndex=*/ 2, /*colIndex=*/ 0)
+              .build();
+      assertThat(binder.next()).isFalse();
+
+      final IntVector ints0 = (IntVector) root.getVector(0);
+      final IntVector ints1 = (IntVector) root.getVector(1);
+      final IntVector ints2 = (IntVector) root.getVector(2);
+      ints0.setSafe(0, 4);
+      ints0.setNull(1);
+      ints1.setNull(0);
+      ints1.setSafe(1, -8);
+      ints2.setNull(0);
+      ints2.setSafe(1, 12);
+      root.setRowCount(2);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(Types.INTEGER);
+      assertThat(statement.getParamValue(2)).isEqualTo(4);
+      assertThat(statement.getParam(3)).isNull();
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(12);
+      assertThat(statement.getParamValue(2)).isNull();
+      assertThat(statement.getParamType(2)).isEqualTo(Types.INTEGER);
+      assertThat(statement.getParam(3)).isNull();
+      assertThat(binder.next()).isFalse();
+
+      binder.reset();
+
+      ints0.setNull(0);
+      ints0.setSafe(1, -2);
+      ints2.setNull(0);
+      ints2.setSafe(1, 6);
+      root.setRowCount(2);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(Types.INTEGER);
+      assertThat(statement.getParamValue(2)).isNull();
+      assertThat(statement.getParamType(2)).isEqualTo(Types.INTEGER);
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(6);
+      assertThat(statement.getParamValue(2)).isEqualTo(-2);
+      assertThat(statement.getParam(3)).isNull();
+      assertThat(binder.next()).isFalse();
+    }
+  }
+
+  @Test
+  void customBinder() throws SQLException {
+    final Schema schema =
+        new Schema(Collections.singletonList(
+            Field.nullable("ints0", new ArrowType.Int(32, true))));
+
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root)
+              .bind(
+                  /*paramIndex=*/ 1,
+                  new ColumnBinder() {
+                    private final IntVector vector = (IntVector) root.getVector(0);
+                    @Override
+                    public void bind(PreparedStatement statement, int parameterIndex, int rowIndex)
+                        throws SQLException {
+                      Integer value = vector.getObject(rowIndex);
+                      if (value == null) {
+                        statement.setString(parameterIndex, "null");
+                      } else {
+                        statement.setString(parameterIndex, Integer.toString(value));
+                      }
+                    }
+
+                    @Override
+                    public int getJdbcType() {
+                      return Types.INTEGER;
+                    }
+
+                    @Override
+                    public FieldVector getVector() {
+                      return vector;
+                    }
+                  })
+              .build();
+      assertThat(binder.next()).isFalse();
+
+      final IntVector ints = (IntVector) root.getVector(0);
+      ints.setSafe(0, 4);
+      ints.setNull(1);
+
+      root.setRowCount(2);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo("4");
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo("null");
+      assertThat(binder.next()).isFalse();
+    }
+  }
+
+  @Test
+  void bool() throws SQLException {
+    testSimpleType(ArrowType.Bool.INSTANCE, Types.BOOLEAN,
+        (BitVector vector, Integer index, Boolean value) -> vector.setSafe(index, value ? 1 : 0),
+        BitVector::setNull,
+        Arrays.asList(true, false, true));
+  }
+
+  @Test
+  void int8() throws SQLException {
+    testSimpleType(new ArrowType.Int(8, true), Types.TINYINT,
+        TinyIntVector::setSafe, TinyIntVector::setNull,
+        Arrays.asList(Byte.MAX_VALUE, Byte.MIN_VALUE, (byte) 42));
+  }
+
+  @Test
+  void int16() throws SQLException {
+    testSimpleType(new ArrowType.Int(16, true), Types.SMALLINT,
+        SmallIntVector::setSafe, SmallIntVector::setNull,
+        Arrays.asList(Short.MAX_VALUE, Short.MIN_VALUE, (short) 42));
+  }
+
+  @Test
+  void int32() throws SQLException {
+    testSimpleType(new ArrowType.Int(32, true), Types.INTEGER,
+        IntVector::setSafe, IntVector::setNull,
+        Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE, 42));
+  }
+
+  @Test
+  void int64() throws SQLException {
+    testSimpleType(new ArrowType.Int(64, true), Types.BIGINT,
+        BigIntVector::setSafe, BigIntVector::setNull,
+        Arrays.asList(Long.MAX_VALUE, Long.MIN_VALUE, 42L));
+  }
+
+  @Test
+  void float32() throws SQLException {
+    testSimpleType(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), Types.REAL,
+        Float4Vector::setSafe, Float4Vector::setNull,
+        Arrays.asList(Float.MIN_VALUE, Float.MAX_VALUE, Float.POSITIVE_INFINITY));
+  }
+
+  @Test
+  void float64() throws SQLException {
+    testSimpleType(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), Types.DOUBLE,
+        Float8Vector::setSafe, Float8Vector::setNull,
+        Arrays.asList(Double.MIN_VALUE, Double.MAX_VALUE, Double.POSITIVE_INFINITY));
+  }
+
+  @Test
+  void time32() throws SQLException {
+    testSimpleType(new ArrowType.Time(TimeUnit.SECOND, 32), Types.TIME,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() / 1_000)),
+        TimeSecVector::setNull,
+        Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+    testSimpleType(new ArrowType.Time(TimeUnit.MILLISECOND, 32), Types.TIME,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, (int) value.getTime()),
+        TimeMilliVector::setNull,
+        Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+  }
+
+  @Test
+  void time64() throws SQLException {
+    testSimpleType(new ArrowType.Time(TimeUnit.MICROSECOND, 64), Types.TIME,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000)),
+        TimeMicroVector::setNull,
+        Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+    testSimpleType(new ArrowType.Time(TimeUnit.NANOSECOND, 64), Types.TIME,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000_000)),
+        TimeNanoVector::setNull,
+        Arrays.asList(new Time(-128), new Time(104), new Time(-42)));
+  }
+
+  @Test
+  void date32() throws SQLException {
+    testSimpleType(new ArrowType.Date(DateUnit.DAY), Types.DATE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() / MILLIS_PER_DAY)),
+        DateDayVector::setNull,
+        Arrays.asList(new Date(-5 * MILLIS_PER_DAY), new Date(2 * MILLIS_PER_DAY), new Date(MILLIS_PER_DAY)));
+  }
+
+  @Test
+  void date64() throws SQLException {
+    testSimpleType(new ArrowType.Date(DateUnit.MILLISECOND), Types.DATE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+        DateMilliVector::setNull,
+        Arrays.asList(new Date(-5 * MILLIS_PER_DAY), new Date(2 * MILLIS_PER_DAY), new Date(MILLIS_PER_DAY)));
+  }
+
+  @Test
+  void timestamp() throws SQLException {
+    List<Timestamp> values = Arrays.asList(new Timestamp(-128_000), new Timestamp(104_000), new Timestamp(-42_000));
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.SECOND, null), Types.TIMESTAMP,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() / 1_000),
+        TimeStampSecVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), Types.TIMESTAMP,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+        TimeStampMilliVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), Types.TIMESTAMP,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000),
+        TimeStampMicroVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), Types.TIMESTAMP,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000_000),
+        TimeStampNanoVector::setNull, values);
+  }
+
+  @Test
+  void timestampTz() throws SQLException {
+    List<Timestamp> values = Arrays.asList(new Timestamp(-128_000), new Timestamp(104_000), new Timestamp(-42_000));
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() / 1_000),
+        TimeStampSecTZVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+        TimeStampMilliTZVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000),
+        TimeStampMicroTZVector::setNull, values);
+    testSimpleType(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+        (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000_000),
+        TimeStampNanoTZVector::setNull, values);
+  }
+
+  @Test
+  void utf8() throws SQLException {
+    testSimpleType(ArrowType.Utf8.INSTANCE, Types.VARCHAR,
+        (VarCharVector vector, Integer index, String value) ->
+            vector.setSafe(index, value.getBytes(StandardCharsets.UTF_8)),
+        BaseVariableWidthVector::setNull,
+        Arrays.asList("", "foobar", "abc"));
+  }
+
+  @Test
+  void largeUtf8() throws SQLException {
+    testSimpleType(ArrowType.LargeUtf8.INSTANCE, Types.LONGVARCHAR,
+        (LargeVarCharVector vector, Integer index, String value) ->
+            vector.setSafe(index, value.getBytes(StandardCharsets.UTF_8)),
+        BaseLargeVariableWidthVector::setNull,
+        Arrays.asList("", "foobar", "abc"));
+  }
+
+  @Test
+  void binary() throws SQLException {
+    testSimpleType(ArrowType.Binary.INSTANCE, Types.VARBINARY,
+        (VarBinaryVector vector, Integer index, byte[] value) ->
+            vector.setSafe(index, value),
+        BaseVariableWidthVector::setNull,
+        Arrays.asList(new byte[0], new byte[] {2, -4}, new byte[] {0, -1, 127, -128}));
+  }
+
+  @Test
+  void largeBinary() throws SQLException {
+    testSimpleType(ArrowType.LargeBinary.INSTANCE, Types.LONGVARBINARY,
+        (LargeVarBinaryVector vector, Integer index, byte[] value) ->
+            vector.setSafe(index, value),
+        BaseLargeVariableWidthVector::setNull,
+        Arrays.asList(new byte[0], new byte[] {2, -4}, new byte[] {0, -1, 127, -128}));
+  }
+
+  @Test
+  void fixedSizeBinary() throws SQLException {
+    testSimpleType(new ArrowType.FixedSizeBinary(3), Types.BINARY,
+        FixedSizeBinaryVector::setSafe, FixedSizeBinaryVector::setNull,
+        Arrays.asList(new byte[3], new byte[] {1, 2, -4}, new byte[] {-1, 127, -128}));
+  }
+
+  @Test
+  void decimal128() throws SQLException {
+    testSimpleType(new ArrowType.Decimal(/*precision*/ 12, /*scale*/3, 128), Types.DECIMAL,
+        DecimalVector::setSafe, DecimalVector::setNull,
+        Arrays.asList(new BigDecimal("120.429"), new BigDecimal("-10590.123"), new BigDecimal("0.000")));
+  }
+
+  @Test
+  void decimal256() throws SQLException {
+    testSimpleType(new ArrowType.Decimal(/*precision*/ 12, /*scale*/3, 256), Types.DECIMAL,
+        Decimal256Vector::setSafe, Decimal256Vector::setNull,
+        Arrays.asList(new BigDecimal("120.429"), new BigDecimal("-10590.123"), new BigDecimal("0.000")));
+  }
+
+  @FunctionalInterface
+  interface TriConsumer<T, U, V> {
+    void accept(T value1, U value2, V value3);
+  }
+
+  <T, V extends FieldVector> void testSimpleType(ArrowType arrowType, int jdbcType, TriConsumer<V, Integer, T> setValue,
+                          BiConsumer<V, Integer> setNull, List<T> values) throws SQLException {
+    Schema schema = new Schema(Collections.singletonList(Field.nullable("field", arrowType)));
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root).bindAll().build();
+      assertThat(binder.next()).isFalse();
+
+      @SuppressWarnings("unchecked")
+      final V vector = (V) root.getVector(0);
+      final ColumnBinder columnBinder = ColumnBinder.forVector(vector);
+      assertThat(columnBinder.getJdbcType()).isEqualTo(jdbcType);
+
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(1));
+      setNull.accept(vector, 2);
+      root.setRowCount(3);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+      assertThat(binder.next()).isFalse();
+
+      binder.reset();
+
+      setNull.accept(vector, 0);
+      setValue.accept(vector, 1, values.get(2));
+      setValue.accept(vector, 2, values.get(0));
+      setValue.accept(vector, 3, values.get(2));
+      setValue.accept(vector, 4, values.get(1));
+      root.setRowCount(5);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isNull();
+      assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isFalse();
+    }
+
+    // Non-nullable (since some types have a specialized binder)
+    schema = new Schema(Collections.singletonList(Field.notNullable("field", arrowType)));
+    try (final MockPreparedStatement statement = new MockPreparedStatement();
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final JdbcParameterBinder binder =
+          JdbcParameterBinder.builder(statement, root).bindAll().build();
+      assertThat(binder.next()).isFalse();
+
+      @SuppressWarnings("unchecked")
+      final V vector = (V) root.getVector(0);
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(1));
+      root.setRowCount(2);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isFalse();
+
+      binder.reset();
+
+      setValue.accept(vector, 0, values.get(0));
+      setValue.accept(vector, 1, values.get(2));
+      setValue.accept(vector, 2, values.get(0));
+      setValue.accept(vector, 3, values.get(2));
+      setValue.accept(vector, 4, values.get(1));
+      root.setRowCount(5);
+
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+      assertThat(binder.next()).isTrue();
+      assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+      assertThat(binder.next()).isFalse();
+    }
+  }
+}
diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java
new file mode 100644
index 0000000000..438a949b73
--- /dev/null
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A PreparedStatement that just stores parameters set on it. */
+public final class MockPreparedStatement implements PreparedStatement {
+  static class ParameterHolder {
+    final Object value;
+    final Integer sqlType;
+    public Calendar calendar;
+
+    ParameterHolder(Object value, Integer sqlType) {
+      this.value = value;
+      this.sqlType = sqlType;
+    }
+  }
+
+  private final Map<Integer, ParameterHolder> parameters;
+
+  MockPreparedStatement() {
+    parameters = new HashMap<>();
+  }
+
+  ParameterHolder getParam(int index) {
+    return parameters.get(index);
+  }
+
+  Object getParamValue(int index) {
+    return parameters.get(index).value;
+  }
+
+  Integer getParamType(int index) {
+    return parameters.get(index).sqlType;
+  }
+
+  @Override
+  public ResultSet executeQuery() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(null, sqlType));
+  }
+
+  @Override
+  public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setByte(int parameterIndex, byte x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setShort(int parameterIndex, short x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setInt(int parameterIndex, int x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setLong(int parameterIndex, long x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setFloat(int parameterIndex, float x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setDouble(int parameterIndex, double x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setString(int parameterIndex, String x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  @Deprecated
+  public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void clearParameters() throws SQLException {
+    parameters.clear();
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, targetSqlType));
+  }
+
+  @Override
+  public void setObject(int parameterIndex, Object x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public boolean execute() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addBatch() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, int length)
+      throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(reader, null));
+  }
+
+  @Override
+  public void setRef(int parameterIndex, Ref x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setBlob(int parameterIndex, Blob x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setClob(int parameterIndex, Clob x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setArray(int parameterIndex, Array x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public ResultSetMetaData getMetaData() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {}
+
+  @Override
+  public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+    ParameterHolder value = new ParameterHolder(x, null);
+    value.calendar = cal;
+    parameters.put(parameterIndex, value);
+  }
+
+  @Override
+  public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {}
+
+  @Override
+  public void setURL(int parameterIndex, URL x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public ParameterMetaData getParameterMetaData() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setRowId(int parameterIndex, RowId x) throws SQLException {
+    parameters.put(parameterIndex, new ParameterHolder(x, null));
+  }
+
+  @Override
+  public void setNString(int parameterIndex, String value) throws SQLException {}
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value, long length)
+      throws SQLException {}
+
+  @Override
+  public void setNClob(int parameterIndex, NClob value) throws SQLException {}
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream, long length)
+      throws SQLException {}
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {}
+
+  @Override
+  public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {}
+
+  @Override
+  public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength)
+      throws SQLException {}
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {}
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {}
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader, long length)
+      throws SQLException {}
+
+  @Override
+  public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {}
+
+  @Override
+  public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {}
+
+  @Override
+  public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {}
+
+  @Override
+  public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {}
+
+  @Override
+  public void setClob(int parameterIndex, Reader reader) throws SQLException {}
+
+  @Override
+  public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {}
+
+  @Override
+  public void setNClob(int parameterIndex, Reader reader) throws SQLException {}
+
+  @Override
+  public ResultSet executeQuery(String sql) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws SQLException {}
+
+  @Override
+  public int getMaxFieldSize() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setMaxFieldSize(int max) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getMaxRows() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setMaxRows(int max) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setEscapeProcessing(boolean enable) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getQueryTimeout() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setQueryTimeout(int seconds) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cancel() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setCursorName(String name) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultSet getResultSet() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getUpdateCount() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getMoreResults() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFetchDirection(int direction) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getFetchDirection() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setFetchSize(int rows) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getFetchSize() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getResultSetConcurrency() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getResultSetType() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addBatch(String sql) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clearBatch() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int[] executeBatch() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean getMoreResults(int current) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultSet getGeneratedKeys() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean execute(String sql, String[] columnNames) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getResultSetHoldability() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setPoolable(boolean poolable) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isPoolable() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeOnCompletion() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isCloseOnCompletion() throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    throw new UnsupportedOperationException();
+  }
+}