You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/26 16:59:51 UTC
[arrow] branch master updated: ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters (#13589)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a5a28377e4 ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters (#13589)
a5a28377e4 is described below
commit a5a28377e43b6435eefa5cc684d80d5fb26f387a
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jul 26 12:59:44 2022 -0400
ARROW-17004: [Java] Add utility to bind Arrow data to JDBC parameters (#13589)
This extends the arrow-jdbc adapter to also allow taking Arrow data and using it to bind JDBC PreparedStatement parameters, allowing you to "round trip" data to a certain extent. This was factored out of arrow-adbc since it's not strictly tied to ADBC.
Authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
docs/source/java/jdbc.rst | 112 ++++-
java/adapter/jdbc/pom.xml | 6 +
.../arrow/adapter/jdbc/JdbcParameterBinder.java | 152 ++++++
.../adapter/jdbc/binder/BaseColumnBinder.java | 44 ++
.../arrow/adapter/jdbc/binder/BigIntBinder.java | 41 ++
.../arrow/adapter/jdbc/binder/BitBinder.java | 45 ++
.../arrow/adapter/jdbc/binder/ColumnBinder.java | 71 +++
.../jdbc/binder/ColumnBinderArrowTypeVisitor.java | 247 ++++++++++
.../arrow/adapter/jdbc/binder/DateDayBinder.java | 59 +++
.../arrow/adapter/jdbc/binder/DateMilliBinder.java | 58 +++
.../adapter/jdbc/binder/Decimal128Binder.java | 46 ++
.../adapter/jdbc/binder/Decimal256Binder.java | 46 ++
.../adapter/jdbc/binder/FixedSizeBinaryBinder.java | 45 ++
.../arrow/adapter/jdbc/binder/Float4Binder.java | 43 ++
.../arrow/adapter/jdbc/binder/Float8Binder.java | 43 ++
.../arrow/adapter/jdbc/binder/IntBinder.java | 41 ++
.../adapter/jdbc/binder/NullableColumnBinder.java | 53 ++
.../arrow/adapter/jdbc/binder/SmallIntBinder.java | 41 ++
.../arrow/adapter/jdbc/binder/Time32Binder.java | 65 +++
.../arrow/adapter/jdbc/binder/Time64Binder.java | 64 +++
.../arrow/adapter/jdbc/binder/TimeStampBinder.java | 98 ++++
.../arrow/adapter/jdbc/binder/TinyIntBinder.java | 41 ++
.../arrow/adapter/jdbc/binder/VarBinaryBinder.java | 62 +++
.../arrow/adapter/jdbc/binder/VarCharBinder.java | 63 +++
.../arrow/adapter/jdbc/binder/package-info.java | 22 +
.../adapter/jdbc/JdbcParameterBinderTest.java | 486 +++++++++++++++++++
.../arrow/adapter/jdbc/MockPreparedStatement.java | 536 +++++++++++++++++++++
27 files changed, 2626 insertions(+), 4 deletions(-)
diff --git a/docs/source/java/jdbc.rst b/docs/source/java/jdbc.rst
index da63351601..c0477cb06d 100644
--- a/docs/source/java/jdbc.rst
+++ b/docs/source/java/jdbc.rst
@@ -84,8 +84,10 @@ inconsistent scale. A RoundingMode can be set to handle these cases:
}
}
-Currently, it is not possible to define a custom type conversion for a
-supported or unsupported type.
+The mapping from JDBC type to Arrow type can be overridden via the
+``JdbcToArrowConfig``, but it is not possible to customize the
+conversion from JDBC value to Arrow value itself, nor is it possible
+to define a conversion for an unsupported type.
Type Mapping
------------
@@ -120,7 +122,7 @@ The JDBC to Arrow type mapping can be obtained at runtime from
+--------------------+--------------------+-------+
| DOUBLE | Double | |
+--------------------+--------------------+-------+
-| FLOAT | Float | |
+| FLOAT | Float32 | |
+--------------------+--------------------+-------+
| INTEGER | Int32 | |
+--------------------+--------------------+-------+
@@ -138,7 +140,7 @@ The JDBC to Arrow type mapping can be obtained at runtime from
+--------------------+--------------------+-------+
| NVARCHAR | Utf8 | |
+--------------------+--------------------+-------+
-| REAL | Float | |
+| REAL | Float32 | |
+--------------------+--------------------+-------+
| SMALLINT | Int16 | |
+--------------------+--------------------+-------+
@@ -172,3 +174,105 @@ The JDBC to Arrow type mapping can be obtained at runtime from
.. _setArraySubTypeByColumnIndexMap: https://arrow.apache.org/docs/java/reference/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.html#setArraySubTypeByColumnIndexMap-java.util.Map-
.. _setArraySubTypeByColumnNameMap: https://arrow.apache.org/docs/java/reference/org/apache/arrow/adapter/jdbc/JdbcToArrowConfigBuilder.html#setArraySubTypeByColumnNameMap-java.util.Map-
.. _ARROW-17006: https://issues.apache.org/jira/browse/ARROW-17006
+
+VectorSchemaRoot to PreparedStatement Parameter Conversion
+==========================================================
+
+The adapter can bind rows of Arrow data from a VectorSchemaRoot to
+parameters of a JDBC PreparedStatement. This can be accessed via the
+JdbcParameterBinder class. Each call to next() will bind parameters
+from the next row of data, and then the application can execute the
+statement, call addBatch(), etc. as desired. Null values will lead to
+a setNull call with an appropriate JDBC type code (listed below).
+
+.. code-block:: java
+
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root).bindAll().build();
+ while (binder.next()) {
+ statement.executeUpdate();
+ }
+ // Use a VectorLoader to update the root
+ binder.reset();
+ while (binder.next()) {
+ statement.executeUpdate();
+ }
+
+The mapping of vectors to parameters, the JDBC type code used by the
+converters, and the type conversions themselves can all be customized:
+
+.. code-block:: java
+
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root)
+ .bind(/*parameterIndex*/2, /*columnIndex*/0)
+ .bind(/*parameterIndex*/1, customColumnBinderInstance)
+ .build();
+
+Type Mapping
+------------
+
+The Arrow to JDBC type mapping can be obtained at runtime via
+a method on ColumnBinder.
+
++----------------------------+----------------------------+-------+
+| Arrow Type | JDBC Type | Notes |
++============================+============================+=======+
+| Binary | VARBINARY (setBytes) | |
++----------------------------+----------------------------+-------+
+| Bool | BOOLEAN (setBoolean) | |
++----------------------------+----------------------------+-------+
+| Date32 | DATE (setDate) | |
++----------------------------+----------------------------+-------+
+| Date64 | DATE (setDate) | |
++----------------------------+----------------------------+-------+
+| Decimal128 | DECIMAL (setBigDecimal) | |
++----------------------------+----------------------------+-------+
+| Decimal256 | DECIMAL (setBigDecimal) | |
++----------------------------+----------------------------+-------+
+| FixedSizeBinary | BINARY (setBytes) | |
++----------------------------+----------------------------+-------+
+| Float32 | REAL (setFloat) | |
++----------------------------+----------------------------+-------+
+| Int8 | TINYINT (setByte) | |
++----------------------------+----------------------------+-------+
+| Int16 | SMALLINT (setShort) | |
++----------------------------+----------------------------+-------+
+| Int32 | INTEGER (setInt) | |
++----------------------------+----------------------------+-------+
+| Int64 | BIGINT (setLong) | |
++----------------------------+----------------------------+-------+
+| LargeBinary | LONGVARBINARY (setBytes) | |
++----------------------------+----------------------------+-------+
+| LargeUtf8 | LONGVARCHAR (setString) | \(1) |
++----------------------------+----------------------------+-------+
+| Time[s] | TIME (setTime) | |
++----------------------------+----------------------------+-------+
+| Time[ms] | TIME (setTime) | |
++----------------------------+----------------------------+-------+
+| Time[us] | TIME (setTime) | |
++----------------------------+----------------------------+-------+
+| Time[ns] | TIME (setTime) | |
++----------------------------+----------------------------+-------+
+| Timestamp[s] | TIMESTAMP (setTimestamp) | \(2) |
++----------------------------+----------------------------+-------+
+| Timestamp[ms] | TIMESTAMP (setTimestamp) | \(2) |
++----------------------------+----------------------------+-------+
+| Timestamp[us] | TIMESTAMP (setTimestamp) | \(2) |
++----------------------------+----------------------------+-------+
+| Timestamp[ns] | TIMESTAMP (setTimestamp) | \(2) |
++----------------------------+----------------------------+-------+
+| Utf8 | VARCHAR (setString) | |
++----------------------------+----------------------------+-------+
+
+* \(1) Strings longer than Integer.MAX_VALUE bytes (the maximum length
+ of a Java ``byte[]``) will cause a runtime exception.
+* \(2) If the timestamp has a timezone, the JDBC type defaults to
+ TIMESTAMP_WITH_TIMEZONE. If the timestamp has no timezone,
+ technically there is not a correct conversion from Arrow value to
+ JDBC value, because a JDBC Timestamp is in UTC, and we have no
+ timezone information. In this case, the default binder will call
+ `setTimestamp(int, Timestamp)
+ <https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html#setTimestamp(int,java.sql.Timestamp)>`_,
+ which will lead to the driver using the "default timezone" (that of
+ the Java VM).
diff --git a/java/adapter/jdbc/pom.xml b/java/adapter/jdbc/pom.xml
index 393ed0dddc..8e5829a284 100644
--- a/java/adapter/jdbc/pom.xml
+++ b/java/adapter/jdbc/pom.xml
@@ -82,6 +82,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java
new file mode 100644
index 0000000000..2dfc0658cb
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinder.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.adapter.jdbc.binder.ColumnBinder;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/**
+ * A binder binds JDBC prepared statement parameters to rows of Arrow data from a VectorSchemaRoot.
+ *
+ * Each row of the VectorSchemaRoot will be bound to the configured parameters of the PreparedStatement.
+ * One row of data is bound at a time.
+ */
+public class JdbcParameterBinder {
+ private final PreparedStatement statement;
+ private final VectorSchemaRoot root;
+ private final ColumnBinder[] binders;
+ private final int[] parameterIndices;
+ private int nextRowIndex;
+
+ /**
+ * Create a new parameter binder.
+ *
+ * @param statement The statement to bind parameters to.
+ * @param root The VectorSchemaRoot to pull data from.
+ * @param binders Column binders to translate from Arrow data to JDBC parameters, one per parameter.
+ * @param parameterIndices For each binder in <tt>binders</tt>, the index of the parameter to bind to.
+ */
+ private JdbcParameterBinder(
+ final PreparedStatement statement,
+ final VectorSchemaRoot root,
+ final ColumnBinder[] binders,
+ int[] parameterIndices) {
+ Preconditions.checkArgument(
+ binders.length == parameterIndices.length,
+ "Number of column binders (%s) must equal number of parameter indices (%s)",
+ binders.length, parameterIndices.length);
+ this.statement = statement;
+ this.root = root;
+ this.binders = binders;
+ this.parameterIndices = parameterIndices;
+ this.nextRowIndex = 0;
+ }
+
+ /**
+ * Initialize a binder with a builder.
+ *
+ * @param statement The statement to bind to. The binder does not maintain ownership of the statement.
+ * @param root The {@link VectorSchemaRoot} to pull data from. The binder does not maintain ownership
+ * of the vector schema root.
+ */
+ public static Builder builder(final PreparedStatement statement, final VectorSchemaRoot root) {
+ return new Builder(statement, root);
+ }
+
+ /** Reset the binder (so the root can be updated with new data). */
+ public void reset() {
+ nextRowIndex = 0;
+ }
+
+ /**
+ * Bind the next row of data to the parameters of the statement.
+ *
+ * After this, the application should call the desired method on the prepared statement,
+ * such as {@link PreparedStatement#executeUpdate()}, or {@link PreparedStatement#addBatch()}.
+ *
+ * @return true if a row was bound, false if rows were exhausted
+ */
+ public boolean next() throws SQLException {
+ if (nextRowIndex >= root.getRowCount()) {
+ return false;
+ }
+ for (int i = 0; i < parameterIndices.length; i++) {
+ final int parameterIndex = parameterIndices[i];
+ binders[i].bind(statement, parameterIndex, nextRowIndex);
+ }
+ nextRowIndex++;
+ return true;
+ }
+
+ /**
+ * A builder for a {@link JdbcParameterBinder}.
+ */
+ public static class Builder {
+ private final PreparedStatement statement;
+ private final VectorSchemaRoot root;
+ private final Map<Integer, ColumnBinder> bindings;
+
+ Builder(PreparedStatement statement, VectorSchemaRoot root) {
+ this.statement = statement;
+ this.root = root;
+ this.bindings = new HashMap<>();
+ }
+
+ /** Bind each column to the corresponding parameter in order. */
+ public Builder bindAll() {
+ for (int i = 0; i < root.getFieldVectors().size(); i++) {
+ bind(/*parameterIndex=*/ i + 1, /*columnIndex=*/ i);
+ }
+ return this;
+ }
+
+ /** Bind the given parameter to the given column using the default binder. */
+ public Builder bind(int parameterIndex, int columnIndex) {
+ return bind(
+ parameterIndex,
+ ColumnBinder.forVector(root.getVector(columnIndex)));
+ }
+
+ /** Bind the given parameter using the given binder. */
+ public Builder bind(int parameterIndex, ColumnBinder binder) {
+ Preconditions.checkArgument(
+ parameterIndex > 0, "parameterIndex %d must be positive", parameterIndex);
+ bindings.put(parameterIndex, binder);
+ return this;
+ }
+
+ /** Build the binder. */
+ public JdbcParameterBinder build() {
+ ColumnBinder[] binders = new ColumnBinder[bindings.size()];
+ int[] parameterIndices = new int[bindings.size()];
+ int index = 0;
+ for (Map.Entry<Integer, ColumnBinder> entry : bindings.entrySet()) {
+ binders[index] = entry.getValue();
+ parameterIndices[index] = entry.getKey();
+ index++;
+ }
+ return new JdbcParameterBinder(statement, root, binders, parameterIndices);
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java
new file mode 100644
index 0000000000..f24f409072
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BaseColumnBinder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * Base class for ColumnBinder implementations.
+ * @param <V> The concrete FieldVector subtype.
+ */
+public abstract class BaseColumnBinder<V extends FieldVector> implements ColumnBinder {
+ protected final V vector;
+ protected final int jdbcType;
+
+ public BaseColumnBinder(V vector, int jdbcType) {
+ this.vector = vector;
+ this.jdbcType = jdbcType;
+ }
+
+ @Override
+ public int getJdbcType() {
+ return jdbcType;
+ }
+
+ @Override
+ public V getVector() {
+ return vector;
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java
new file mode 100644
index 0000000000..fde4642ef9
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BigIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BigIntVector;
+
+/** A column binder for 8-bit integers. */
+public class BigIntBinder extends BaseColumnBinder<BigIntVector> {
+ public BigIntBinder(BigIntVector vector) {
+ this(vector, Types.BIGINT);
+ }
+
+ public BigIntBinder(BigIntVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final long value = vector.getDataBuffer().getLong((long) rowIndex * BigIntVector.TYPE_WIDTH);
+ statement.setLong(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java
new file mode 100644
index 0000000000..adae513e99
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/BitBinder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BitVector;
+
+/** A column binder for booleans. */
+public class BitBinder extends BaseColumnBinder<BitVector> {
+ public BitBinder(BitVector vector) {
+ this(vector, Types.BOOLEAN);
+ }
+
+ public BitBinder(BitVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ // See BitVector#getBit
+ final int byteIndex = rowIndex >> 3;
+ final byte b = vector.getDataBuffer().getByte(byteIndex);
+ final int bitIndex = rowIndex & 7;
+ final int value = (b >> bitIndex) & 0x01;
+ statement.setBoolean(parameterIndex, value != 0);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java
new file mode 100644
index 0000000000..c2b1259e14
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A helper to bind values from a wrapped Arrow vector to a JDBC PreparedStatement.
+ */
+public interface ColumnBinder {
+ /**
+ * Bind the given row to the given parameter.
+ *
+ * @param statement The statement to bind to.
+ * @param parameterIndex The parameter to bind to (1-indexed)
+ * @param rowIndex The row to bind values from (0-indexed)
+ * @throws SQLException if an error occurs
+ */
+ void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException;
+
+ /**
+ * Get the JDBC type code used by this binder.
+ *
+ * @return A type code from {@link java.sql.Types}.
+ */
+ int getJdbcType();
+
+ /**
+ * Get the vector used by this binder.
+ */
+ FieldVector getVector();
+
+ /**
+ * Create a column binder for a vector, using the default JDBC type code for null values.
+ */
+ static ColumnBinder forVector(FieldVector vector) {
+ return forVector(vector, /*jdbcType*/ null);
+ }
+
+ /**
+ * Create a column binder for a vector, overriding the JDBC type code used for null values.
+ *
+ * @param vector The vector that the column binder will wrap.
+ * @param jdbcType The JDBC type code to use (or null to use the default).
+ */
+ static ColumnBinder forVector(FieldVector vector, Integer jdbcType) {
+ final ColumnBinder binder = vector.getField().getType().accept(new ColumnBinderArrowTypeVisitor(vector, jdbcType));
+ if (vector.getField().isNullable()) {
+ return new NullableColumnBinder(binder);
+ }
+ return binder;
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
new file mode 100644
index 0000000000..6496ca5a31
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/ColumnBinderArrowTypeVisitor.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Types;
+import java.time.ZoneId;
+import java.util.Calendar;
+import java.util.TimeZone;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+/**
+ * Visitor to create the base ColumnBinder for a vector.
+ * <p>
+ * To handle null values, wrap the returned binder in a {@link NullableColumnBinder}.
+ */
+public class ColumnBinderArrowTypeVisitor implements ArrowType.ArrowTypeVisitor<ColumnBinder> {
+ private final FieldVector vector;
+ private final Integer jdbcType;
+
+ /**
+ * Create a binder using a custom JDBC type code.
+ *
+ * @param vector The vector that the binder will wrap.
+ * @param jdbcType The JDBC type code (or null to use the default).
+ */
+ public ColumnBinderArrowTypeVisitor(FieldVector vector, Integer jdbcType) {
+ this.vector = vector;
+ this.jdbcType = jdbcType;
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Null type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Struct type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.List type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.LargeList type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.FixedSizeList type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Union type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Map type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Int type) {
+ if (!type.getIsSigned()) {
+ throw new UnsupportedOperationException(
+ "No column binder implemented for unsigned type " + type);
+ }
+ switch (type.getBitWidth()) {
+ case 8:
+ return jdbcType == null ? new TinyIntBinder((TinyIntVector) vector) :
+ new TinyIntBinder((TinyIntVector) vector, jdbcType);
+ case 16:
+ return jdbcType == null ? new SmallIntBinder((SmallIntVector) vector) :
+ new SmallIntBinder((SmallIntVector) vector, jdbcType);
+ case 32:
+ return jdbcType == null ? new IntBinder((IntVector) vector) :
+ new IntBinder((IntVector) vector, jdbcType);
+ case 64:
+ return jdbcType == null ? new BigIntBinder((BigIntVector) vector) :
+ new BigIntBinder((BigIntVector) vector, jdbcType);
+ default:
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.FloatingPoint type) {
+ switch (type.getPrecision()) {
+ case SINGLE:
+ return jdbcType == null ? new Float4Binder((Float4Vector) vector) :
+ new Float4Binder((Float4Vector) vector, jdbcType);
+ case DOUBLE:
+ return jdbcType == null ? new Float8Binder((Float8Vector) vector) :
+ new Float8Binder((Float8Vector) vector, jdbcType);
+ default:
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Utf8 type) {
+ VarCharVector varChar = (VarCharVector) vector;
+ return jdbcType == null ? new VarCharBinder<>(varChar, Types.VARCHAR) :
+ new VarCharBinder<>(varChar, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.LargeUtf8 type) {
+ LargeVarCharVector varChar = (LargeVarCharVector) vector;
+ return jdbcType == null ? new VarCharBinder<>(varChar, Types.LONGVARCHAR) :
+ new VarCharBinder<>(varChar, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Binary type) {
+ VarBinaryVector varBinary = (VarBinaryVector) vector;
+ return jdbcType == null ? new VarBinaryBinder<>(varBinary, Types.VARBINARY) :
+ new VarBinaryBinder<>(varBinary, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.LargeBinary type) {
+ LargeVarBinaryVector varBinary = (LargeVarBinaryVector) vector;
+ return jdbcType == null ? new VarBinaryBinder<>(varBinary, Types.LONGVARBINARY) :
+ new VarBinaryBinder<>(varBinary, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.FixedSizeBinary type) {
+ FixedSizeBinaryVector binary = (FixedSizeBinaryVector) vector;
+ return jdbcType == null ? new FixedSizeBinaryBinder(binary, Types.BINARY) :
+ new FixedSizeBinaryBinder(binary, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Bool type) {
+ return jdbcType == null ? new BitBinder((BitVector) vector) : new BitBinder((BitVector) vector, jdbcType);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Decimal type) {
+ if (type.getBitWidth() == 128) {
+ DecimalVector decimalVector = (DecimalVector) vector;
+ return jdbcType == null ? new Decimal128Binder(decimalVector) : new Decimal128Binder(decimalVector, jdbcType);
+ } else if (type.getBitWidth() == 256) {
+ Decimal256Vector decimalVector = (Decimal256Vector) vector;
+ return jdbcType == null ? new Decimal256Binder(decimalVector) : new Decimal256Binder(decimalVector, jdbcType);
+ }
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Date type) {
+ switch (type.getUnit()) {
+ case DAY:
+ return jdbcType == null ? new DateDayBinder((DateDayVector) vector) :
+ new DateDayBinder((DateDayVector) vector, /*calendar*/null, jdbcType);
+ case MILLISECOND:
+ return jdbcType == null ? new DateMilliBinder((DateMilliVector) vector) :
+ new DateMilliBinder((DateMilliVector) vector, /*calendar*/null, jdbcType);
+ default:
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Time type) {
+ switch (type.getUnit()) {
+ case SECOND:
+ return jdbcType == null ? new Time32Binder((TimeSecVector) vector) :
+ new Time32Binder((TimeSecVector) vector, jdbcType);
+ case MILLISECOND:
+ return jdbcType == null ? new Time32Binder((TimeMilliVector) vector) :
+ new Time32Binder((TimeMilliVector) vector, jdbcType);
+ case MICROSECOND:
+ return jdbcType == null ? new Time64Binder((TimeMicroVector) vector) :
+ new Time64Binder((TimeMicroVector) vector, jdbcType);
+ case NANOSECOND:
+ return jdbcType == null ? new Time64Binder((TimeNanoVector) vector) :
+ new Time64Binder((TimeNanoVector) vector, jdbcType);
+ default:
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Timestamp type) {
+ Calendar calendar = null;
+ final String timezone = type.getTimezone();
+ if (timezone != null && !timezone.isEmpty()) {
+ calendar = Calendar.getInstance(TimeZone.getTimeZone(ZoneId.of(timezone)));
+ }
+ return new TimeStampBinder((TimeStampVector) vector, calendar);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Interval type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+
+ @Override
+ public ColumnBinder visit(ArrowType.Duration type) {
+ throw new UnsupportedOperationException("No column binder implemented for type " + type);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java
new file mode 100644
index 0000000000..bc16790c8f
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateDayBinder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.DateDayVector;
+
+/**
+ * A column binder for 32-bit dates.
+ */
+public class DateDayBinder extends BaseColumnBinder<DateDayVector> {
+ private static final long MILLIS_PER_DAY = 86_400_000;
+ private final Calendar calendar;
+
+ public DateDayBinder(DateDayVector vector) {
+ this(vector, null, Types.DATE);
+ }
+
+ public DateDayBinder(DateDayVector vector, Calendar calendar) {
+ this(vector, calendar, Types.DATE);
+ }
+
+ public DateDayBinder(DateDayVector vector, Calendar calendar, int jdbcType) {
+ super(vector, jdbcType);
+ this.calendar = calendar;
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ // TODO: multiply with overflow
+ final long index = (long) rowIndex * DateDayVector.TYPE_WIDTH;
+ final Date value = new Date(vector.getDataBuffer().getInt(index) * MILLIS_PER_DAY);
+ if (calendar == null) {
+ statement.setDate(parameterIndex, value);
+ } else {
+ statement.setDate(parameterIndex, value, calendar);
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java
new file mode 100644
index 0000000000..5cb91b46ac
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/DateMilliBinder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.DateMilliVector;
+
+/**
+ * A column binder for 64-bit dates.
+ */
+public class DateMilliBinder extends BaseColumnBinder<DateMilliVector> {
+ private final Calendar calendar;
+
+ public DateMilliBinder(DateMilliVector vector) {
+ this(vector, null, Types.DATE);
+ }
+
+ public DateMilliBinder(DateMilliVector vector, Calendar calendar) {
+ this(vector, calendar, Types.DATE);
+ }
+
+
+ public DateMilliBinder(DateMilliVector vector, Calendar calendar, int jdbcType) {
+ super(vector, jdbcType);
+ this.calendar = calendar;
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final long index = (long) rowIndex * DateMilliVector.TYPE_WIDTH;
+ final Date value = new Date(vector.getDataBuffer().getLong(index));
+ if (calendar == null) {
+ statement.setDate(parameterIndex, value);
+ } else {
+ statement.setDate(parameterIndex, value, calendar);
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java
new file mode 100644
index 0000000000..9e9d0e4fdb
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal128Binder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+
+/**
+ * A binder for 128-bit decimals.
+ */
+public class Decimal128Binder extends BaseColumnBinder<DecimalVector> {
+ public Decimal128Binder(DecimalVector vector) {
+ this(vector, Types.DECIMAL);
+ }
+
+ public Decimal128Binder(DecimalVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final BigDecimal value = DecimalUtility.getBigDecimalFromArrowBuf(
+ vector.getDataBuffer(), rowIndex, vector.getScale(), DecimalVector.TYPE_WIDTH);
+ statement.setBigDecimal(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java
new file mode 100644
index 0000000000..bd29e083b4
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Decimal256Binder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.util.DecimalUtility;
+
+/**
+ * A binder for 256-bit decimals.
+ */
+public class Decimal256Binder extends BaseColumnBinder<Decimal256Vector> {
+ public Decimal256Binder(Decimal256Vector vector) {
+ this(vector, Types.DECIMAL);
+ }
+
+ public Decimal256Binder(Decimal256Vector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final BigDecimal value = DecimalUtility.getBigDecimalFromArrowBuf(
+ vector.getDataBuffer(), rowIndex, vector.getScale(), Decimal256Vector.TYPE_WIDTH);
+ statement.setBigDecimal(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java
new file mode 100644
index 0000000000..7edc5e4532
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/FixedSizeBinaryBinder.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+
+/**
+ * A binder for fixed-width binary types.
+ */
+public class FixedSizeBinaryBinder extends BaseColumnBinder<FixedSizeBinaryVector> {
+ /**
+ * Create a binder for the given vector using the given JDBC type for null values.
+ *
+ * @param vector The vector to draw values from.
+ * @param jdbcType The JDBC type code.
+ */
+ public FixedSizeBinaryBinder(FixedSizeBinaryVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ byte[] binaryData = new byte[vector.getByteWidth()];
+ vector.getDataBuffer().getBytes((long) rowIndex * binaryData.length, binaryData, 0, binaryData.length);
+ statement.setBytes(parameterIndex, binaryData);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java
new file mode 100644
index 0000000000..a471c1ebad
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float4Binder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Float4Vector;
+
+/**
+ * A binder for 32-bit floats.
+ */
+public class Float4Binder extends BaseColumnBinder<Float4Vector> {
+ public Float4Binder(Float4Vector vector) {
+ this(vector, Types.REAL);
+ }
+
+ public Float4Binder(Float4Vector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final float value = vector.getDataBuffer().getFloat((long) rowIndex * Float4Vector.TYPE_WIDTH);
+ statement.setFloat(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java
new file mode 100644
index 0000000000..4710c3b598
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Float8Binder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.Float8Vector;
+
+/**
+ * A binder for 64-bit floats.
+ */
+public class Float8Binder extends BaseColumnBinder<Float8Vector> {
+ public Float8Binder(Float8Vector vector) {
+ this(vector, Types.DOUBLE);
+ }
+
+ public Float8Binder(Float8Vector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final double value = vector.getDataBuffer().getDouble((long) rowIndex * Float8Vector.TYPE_WIDTH);
+ statement.setDouble(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java
new file mode 100644
index 0000000000..7d47f585a3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/IntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.IntVector;
+
+/** A column binder for 32-bit integers. */
+public class IntBinder extends BaseColumnBinder<IntVector> {
+ public IntBinder(IntVector vector) {
+ this(vector, Types.INTEGER);
+ }
+
+ public IntBinder(IntVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final int value = vector.getDataBuffer().getInt((long) rowIndex * IntVector.TYPE_WIDTH);
+ statement.setInt(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java
new file mode 100644
index 0000000000..123b587ca5
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/NullableColumnBinder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A ColumnBinder that checks for nullability before deferring to a type-specific binder.
+ */
+public class NullableColumnBinder implements ColumnBinder {
+ private final ColumnBinder wrapped;
+
+ public NullableColumnBinder(ColumnBinder wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ if (wrapped.getVector().isNull(rowIndex)) {
+ statement.setNull(parameterIndex, wrapped.getJdbcType());
+ } else {
+ wrapped.bind(statement, parameterIndex, rowIndex);
+ }
+ }
+
+ @Override
+ public int getJdbcType() {
+ return wrapped.getJdbcType();
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return wrapped.getVector();
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java
new file mode 100644
index 0000000000..f9d744b9f5
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/SmallIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.SmallIntVector;
+
+/** A column binder for 8-bit integers. */
+public class SmallIntBinder extends BaseColumnBinder<SmallIntVector> {
+ public SmallIntBinder(SmallIntVector vector) {
+ this(vector, Types.SMALLINT);
+ }
+
+ public SmallIntBinder(SmallIntVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final short value = vector.getDataBuffer().getShort((short) rowIndex * SmallIntVector.TYPE_WIDTH);
+ statement.setShort(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java
new file mode 100644
index 0000000000..5dc7e3f513
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time32Binder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeSecVector;
+
+/**
+ * A binder for 32-bit time types.
+ */
+public class Time32Binder extends BaseColumnBinder<BaseFixedWidthVector> {
+ private static final long TYPE_WIDTH = 4;
+
+ private final long factor;
+
+ public Time32Binder(TimeSecVector vector) {
+ this(vector, Types.TIME);
+ }
+
+ public Time32Binder(TimeMilliVector vector) {
+ this(vector, Types.TIME);
+ }
+
+ public Time32Binder(TimeSecVector vector, int jdbcType) {
+ this(vector, /*factor*/1_000, jdbcType);
+ }
+
+ public Time32Binder(TimeMilliVector vector, int jdbcType) {
+ this(vector, /*factor*/1, jdbcType);
+ }
+
+ Time32Binder(BaseFixedWidthVector vector, long factor, int jdbcType) {
+ super(vector, jdbcType);
+ this.factor = factor;
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ // TODO: multiply with overflow
+ // TODO: take in a Calendar as well?
+ final Time value = new Time(vector.getDataBuffer().getInt(rowIndex * TYPE_WIDTH) * factor);
+ statement.setTime(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java
new file mode 100644
index 0000000000..8d62ae0eb3
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/Time64Binder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Types;
+
+import org.apache.arrow.vector.BaseFixedWidthVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeNanoVector;
+
+/**
+ * A binder for 64-bit time types.
+ */
+public class Time64Binder extends BaseColumnBinder<BaseFixedWidthVector> {
+ private static final long TYPE_WIDTH = 8;
+
+ private final long factor;
+
+ public Time64Binder(TimeMicroVector vector) {
+ this(vector, Types.TIME);
+ }
+
+ public Time64Binder(TimeNanoVector vector) {
+ this(vector, Types.TIME);
+ }
+
+ public Time64Binder(TimeMicroVector vector, int jdbcType) {
+ this(vector, /*factor*/1_000, jdbcType);
+ }
+
+ public Time64Binder(TimeNanoVector vector, int jdbcType) {
+ this(vector, /*factor*/1_000_000, jdbcType);
+ }
+
+ Time64Binder(BaseFixedWidthVector vector, long factor, int jdbcType) {
+ super(vector, jdbcType);
+ this.factor = factor;
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ // TODO: option to throw on truncation (vendor Guava IntMath#multiply)
+ final Time value = new Time(vector.getDataBuffer().getLong(rowIndex * TYPE_WIDTH) / factor);
+ statement.setTime(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java
new file mode 100644
index 0000000000..6677e59099
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TimeStampBinder.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Calendar;
+
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+/** A column binder for timestamps. */
+public class TimeStampBinder extends BaseColumnBinder<TimeStampVector> {
+ private final Calendar calendar;
+ private final long unitsPerSecond;
+ private final long nanosPerUnit;
+
+ /**
+ * Create a binder for a timestamp vector using the default JDBC type code.
+ */
+ public TimeStampBinder(TimeStampVector vector, Calendar calendar) {
+ this(vector, calendar, isZoned(vector.getField().getType()) ? Types.TIMESTAMP_WITH_TIMEZONE : Types.TIMESTAMP);
+ }
+
+ /**
+ * Create a binder for a timestamp vector.
+ * @param vector The vector to pull values from.
+ * @param calendar Optionally, the calendar to pass to JDBC.
+ * @param jdbcType The JDBC type code to use for null values.
+ */
+ public TimeStampBinder(TimeStampVector vector, Calendar calendar, int jdbcType) {
+ super(vector, jdbcType);
+ this.calendar = calendar;
+
+ final ArrowType.Timestamp type = (ArrowType.Timestamp) vector.getField().getType();
+ switch (type.getUnit()) {
+ case SECOND:
+ this.unitsPerSecond = 1;
+ this.nanosPerUnit = 1_000_000_000;
+ break;
+ case MILLISECOND:
+ this.unitsPerSecond = 1_000;
+ this.nanosPerUnit = 1_000_000;
+ break;
+ case MICROSECOND:
+ this.unitsPerSecond = 1_000_000;
+ this.nanosPerUnit = 1_000;
+ break;
+ case NANOSECOND:
+ this.unitsPerSecond = 1_000_000_000;
+ this.nanosPerUnit = 1;
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid time unit in " + type);
+ }
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ // TODO: option to throw on truncation (vendor Guava IntMath#multiply) or overflow
+ final long rawValue = vector.getDataBuffer().getLong((long) rowIndex * TimeStampVector.TYPE_WIDTH);
+ final long seconds = rawValue / unitsPerSecond;
+ final int nanos = (int) ((rawValue - (seconds * unitsPerSecond)) * nanosPerUnit);
+ final Timestamp value = new Timestamp(seconds * 1_000);
+ value.setNanos(nanos);
+ if (calendar != null) {
+ // Timestamp == Date == UTC timestamp (confusingly). Arrow's timestamp with timezone is a UTC value with a
+ // zone offset, so we don't need to do any conversion.
+ statement.setTimestamp(parameterIndex, value, calendar);
+ } else {
+ // Arrow timestamp without timezone isn't strictly convertible to any timezone. So this is technically wrong,
+ // but there is no 'correct' interpretation here. The application should provide a calendar.
+ statement.setTimestamp(parameterIndex, value);
+ }
+ }
+
+ private static boolean isZoned(ArrowType type) {
+ final String timezone = ((ArrowType.Timestamp) type).getTimezone();
+ return timezone != null && !timezone.isEmpty();
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java
new file mode 100644
index 0000000000..f51d139be8
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/TinyIntBinder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+
+import org.apache.arrow.vector.TinyIntVector;
+
+/** A column binder for 8-bit integers. */
+public class TinyIntBinder extends BaseColumnBinder<TinyIntVector> {
+ public TinyIntBinder(TinyIntVector vector) {
+ this(vector, Types.TINYINT);
+ }
+
+ public TinyIntBinder(TinyIntVector vector, int jdbcType) {
+ super(vector, jdbcType);
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ final byte value = vector.getDataBuffer().getByte((long) rowIndex * TinyIntVector.TYPE_WIDTH);
+ statement.setByte(parameterIndex, value);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java
new file mode 100644
index 0000000000..a94cff6a00
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarBinaryBinder.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.util.ArrowBufPointer;
+import org.apache.arrow.vector.ElementAddressableVector;
+import org.apache.arrow.vector.FieldVector;
+
+/**
+ * A binder for variable-width binary types.
+ *
+ * @param <T> The binary vector.
+ */
+public class VarBinaryBinder<T extends FieldVector & ElementAddressableVector> extends BaseColumnBinder<T> {
+ private final ArrowBufPointer element;
+
+ /**
+ * Create a binder for the given vector using the given JDBC type for null values.
+ *
+ * @param vector The vector to draw values from.
+ * @param jdbcType The JDBC type code.
+ */
+ public VarBinaryBinder(T vector, int jdbcType) {
+ super(vector, jdbcType);
+ this.element = new ArrowBufPointer();
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ vector.getDataPointer(rowIndex, element);
+ if (element.getBuf() == null) {
+ statement.setNull(parameterIndex, jdbcType);
+ return;
+ }
+ if (element.getLength() > (long) Integer.MAX_VALUE) {
+ final String message = String.format("Length of value at index %d (%d) exceeds Integer.MAX_VALUE",
+ rowIndex, element.getLength());
+ throw new RuntimeException(message);
+ }
+ byte[] binaryData = new byte[(int) element.getLength()];
+ element.getBuf().getBytes(element.getOffset(), binaryData);
+ statement.setBytes(parameterIndex, binaryData);
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java
new file mode 100644
index 0000000000..73bd559814
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/VarCharBinder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.arrow.memory.util.ArrowBufPointer;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VariableWidthVector;
+
+/**
+ * A binder for variable-width string types.
+ *
+ * @param <T> The text vector.
+ */
+public class VarCharBinder<T extends FieldVector & VariableWidthVector> extends BaseColumnBinder<T> {
+ private final ArrowBufPointer element;
+
+ /**
+ * Create a binder for the given vector using the given JDBC type for null values.
+ *
+ * @param vector The vector to draw values from.
+ * @param jdbcType The JDBC type code.
+ */
+ public VarCharBinder(T vector, int jdbcType) {
+ super(vector, jdbcType);
+ this.element = new ArrowBufPointer();
+ }
+
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex) throws SQLException {
+ vector.getDataPointer(rowIndex, element);
+ if (element.getBuf() == null) {
+ statement.setNull(parameterIndex, jdbcType);
+ return;
+ }
+ if (element.getLength() > (long) Integer.MAX_VALUE) {
+ final String message = String.format("Length of value at index %d (%d) exceeds Integer.MAX_VALUE",
+ rowIndex, element.getLength());
+ throw new RuntimeException(message);
+ }
+ byte[] utf8Bytes = new byte[(int) element.getLength()];
+ element.getBuf().getBytes(element.getOffset(), utf8Bytes);
+ statement.setString(parameterIndex, new String(utf8Bytes, StandardCharsets.UTF_8));
+ }
+}
diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java
new file mode 100644
index 0000000000..4f8936e0c2
--- /dev/null
+++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/binder/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utilities to bind Arrow data as JDBC prepared statement parameters.
+ */
+
+package org.apache.arrow.adapter.jdbc.binder;
diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
new file mode 100644
index 0000000000..c8c043f2f0
--- /dev/null
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/JdbcParameterBinderTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.apache.arrow.adapter.jdbc.binder.ColumnBinder;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BaseLargeVariableWidthVector;
+import org.apache.arrow.vector.BaseVariableWidthVector;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.Decimal256Vector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.LargeVarBinaryVector;
+import org.apache.arrow.vector.LargeVarCharVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeNanoVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecTZVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class JdbcParameterBinderTest {
+ private static final long MILLIS_PER_DAY = 86_400_000;
+ BufferAllocator allocator;
+
+ @BeforeEach
+ void beforeEach() {
+ allocator = new RootAllocator();
+ }
+
+ @AfterEach
+ void afterEach() {
+ allocator.close();
+ }
+
+ @Test
+ void bindOrder() throws SQLException {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("ints0", new ArrowType.Int(32, true)),
+ Field.nullable("ints1", new ArrowType.Int(32, true)),
+ Field.nullable("ints2", new ArrowType.Int(32, true))));
+ try (final MockPreparedStatement statement = new MockPreparedStatement();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root)
+ .bind(/*paramIndex=*/ 1, /*colIndex=*/ 2)
+ .bind(/*paramIndex=*/ 2, /*colIndex=*/ 0)
+ .build();
+ assertThat(binder.next()).isFalse();
+
+ final IntVector ints0 = (IntVector) root.getVector(0);
+ final IntVector ints1 = (IntVector) root.getVector(1);
+ final IntVector ints2 = (IntVector) root.getVector(2);
+ ints0.setSafe(0, 4);
+ ints0.setNull(1);
+ ints1.setNull(0);
+ ints1.setSafe(1, -8);
+ ints2.setNull(0);
+ ints2.setSafe(1, 12);
+ root.setRowCount(2);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isNull();
+ assertThat(statement.getParamType(1)).isEqualTo(Types.INTEGER);
+ assertThat(statement.getParamValue(2)).isEqualTo(4);
+ assertThat(statement.getParam(3)).isNull();
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(12);
+ assertThat(statement.getParamValue(2)).isNull();
+ assertThat(statement.getParamType(2)).isEqualTo(Types.INTEGER);
+ assertThat(statement.getParam(3)).isNull();
+ assertThat(binder.next()).isFalse();
+
+ binder.reset();
+
+ ints0.setNull(0);
+ ints0.setSafe(1, -2);
+ ints2.setNull(0);
+ ints2.setSafe(1, 6);
+ root.setRowCount(2);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isNull();
+ assertThat(statement.getParamType(1)).isEqualTo(Types.INTEGER);
+ assertThat(statement.getParamValue(2)).isNull();
+ assertThat(statement.getParamType(2)).isEqualTo(Types.INTEGER);
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(6);
+ assertThat(statement.getParamValue(2)).isEqualTo(-2);
+ assertThat(statement.getParam(3)).isNull();
+ assertThat(binder.next()).isFalse();
+ }
+ }
+
+ @Test
+ void customBinder() throws SQLException {
+ final Schema schema =
+ new Schema(Collections.singletonList(
+ Field.nullable("ints0", new ArrowType.Int(32, true))));
+
+ try (final MockPreparedStatement statement = new MockPreparedStatement();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root)
+ .bind(
+ /*paramIndex=*/ 1,
+ new ColumnBinder() {
+ private final IntVector vector = (IntVector) root.getVector(0);
+ @Override
+ public void bind(PreparedStatement statement, int parameterIndex, int rowIndex)
+ throws SQLException {
+ Integer value = vector.getObject(rowIndex);
+ if (value == null) {
+ statement.setString(parameterIndex, "null");
+ } else {
+ statement.setString(parameterIndex, Integer.toString(value));
+ }
+ }
+
+ @Override
+ public int getJdbcType() {
+ return Types.INTEGER;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return vector;
+ }
+ })
+ .build();
+ assertThat(binder.next()).isFalse();
+
+ final IntVector ints = (IntVector) root.getVector(0);
+ ints.setSafe(0, 4);
+ ints.setNull(1);
+
+ root.setRowCount(2);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo("4");
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo("null");
+ assertThat(binder.next()).isFalse();
+ }
+ }
+
+ @Test
+ void bool() throws SQLException {
+ testSimpleType(ArrowType.Bool.INSTANCE, Types.BOOLEAN,
+ (BitVector vector, Integer index, Boolean value) -> vector.setSafe(index, value ? 1 : 0),
+ BitVector::setNull,
+ Arrays.asList(true, false, true));
+ }
+
+ @Test
+ void int8() throws SQLException {
+ testSimpleType(new ArrowType.Int(8, true), Types.TINYINT,
+ TinyIntVector::setSafe, TinyIntVector::setNull,
+ Arrays.asList(Byte.MAX_VALUE, Byte.MIN_VALUE, (byte) 42));
+ }
+
+ @Test
+ void int16() throws SQLException {
+ testSimpleType(new ArrowType.Int(16, true), Types.SMALLINT,
+ SmallIntVector::setSafe, SmallIntVector::setNull,
+ Arrays.asList(Short.MAX_VALUE, Short.MIN_VALUE, (short) 42));
+ }
+
+ @Test
+ void int32() throws SQLException {
+ testSimpleType(new ArrowType.Int(32, true), Types.INTEGER,
+ IntVector::setSafe, IntVector::setNull,
+ Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE, 42));
+ }
+
+ @Test
+ void int64() throws SQLException {
+ testSimpleType(new ArrowType.Int(64, true), Types.BIGINT,
+ BigIntVector::setSafe, BigIntVector::setNull,
+ Arrays.asList(Long.MAX_VALUE, Long.MIN_VALUE, 42L));
+ }
+
+ @Test
+ void float32() throws SQLException {
+ testSimpleType(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE), Types.REAL,
+ Float4Vector::setSafe, Float4Vector::setNull,
+ Arrays.asList(Float.MIN_VALUE, Float.MAX_VALUE, Float.POSITIVE_INFINITY));
+ }
+
+ @Test
+ void float64() throws SQLException {
+ testSimpleType(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), Types.DOUBLE,
+ Float8Vector::setSafe, Float8Vector::setNull,
+ Arrays.asList(Double.MIN_VALUE, Double.MAX_VALUE, Double.POSITIVE_INFINITY));
+ }
+
+ @Test
+ void time32() throws SQLException {
+ testSimpleType(new ArrowType.Time(TimeUnit.SECOND, 32), Types.TIME,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() / 1_000)),
+ TimeSecVector::setNull,
+ Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+ testSimpleType(new ArrowType.Time(TimeUnit.MILLISECOND, 32), Types.TIME,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, (int) value.getTime()),
+ TimeMilliVector::setNull,
+ Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+ }
+
+ @Test
+ void time64() throws SQLException {
+ testSimpleType(new ArrowType.Time(TimeUnit.MICROSECOND, 64), Types.TIME,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000)),
+ TimeMicroVector::setNull,
+ Arrays.asList(new Time(-128_000), new Time(104_000), new Time(-42_000)));
+ testSimpleType(new ArrowType.Time(TimeUnit.NANOSECOND, 64), Types.TIME,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() * 1_000_000)),
+ TimeNanoVector::setNull,
+ Arrays.asList(new Time(-128), new Time(104), new Time(-42)));
+ }
+
+ @Test
+ void date32() throws SQLException {
+ testSimpleType(new ArrowType.Date(DateUnit.DAY), Types.DATE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, (int) (value.getTime() / MILLIS_PER_DAY)),
+ DateDayVector::setNull,
+ Arrays.asList(new Date(-5 * MILLIS_PER_DAY), new Date(2 * MILLIS_PER_DAY), new Date(MILLIS_PER_DAY)));
+ }
+
+ @Test
+ void date64() throws SQLException {
+ testSimpleType(new ArrowType.Date(DateUnit.MILLISECOND), Types.DATE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+ DateMilliVector::setNull,
+ Arrays.asList(new Date(-5 * MILLIS_PER_DAY), new Date(2 * MILLIS_PER_DAY), new Date(MILLIS_PER_DAY)));
+ }
+
+ @Test
+ void timestamp() throws SQLException {
+ List<Timestamp> values = Arrays.asList(new Timestamp(-128_000), new Timestamp(104_000), new Timestamp(-42_000));
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.SECOND, null), Types.TIMESTAMP,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() / 1_000),
+ TimeStampSecVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null), Types.TIMESTAMP,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+ TimeStampMilliVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null), Types.TIMESTAMP,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000),
+ TimeStampMicroVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null), Types.TIMESTAMP,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000_000),
+ TimeStampNanoVector::setNull, values);
+ }
+
+ @Test
+ void timestampTz() throws SQLException {
+ List<Timestamp> values = Arrays.asList(new Timestamp(-128_000), new Timestamp(104_000), new Timestamp(-42_000));
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() / 1_000),
+ TimeStampSecTZVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime()),
+ TimeStampMilliTZVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000),
+ TimeStampMicroTZVector::setNull, values);
+ testSimpleType(new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), Types.TIMESTAMP_WITH_TIMEZONE,
+ (valueVectors, index, value) -> valueVectors.setSafe(index, value.getTime() * 1_000_000),
+ TimeStampNanoTZVector::setNull, values);
+ }
+
+ @Test
+ void utf8() throws SQLException {
+ testSimpleType(ArrowType.Utf8.INSTANCE, Types.VARCHAR,
+ (VarCharVector vector, Integer index, String value) ->
+ vector.setSafe(index, value.getBytes(StandardCharsets.UTF_8)),
+ BaseVariableWidthVector::setNull,
+ Arrays.asList("", "foobar", "abc"));
+ }
+
+ @Test
+ void largeUtf8() throws SQLException {
+ testSimpleType(ArrowType.LargeUtf8.INSTANCE, Types.LONGVARCHAR,
+ (LargeVarCharVector vector, Integer index, String value) ->
+ vector.setSafe(index, value.getBytes(StandardCharsets.UTF_8)),
+ BaseLargeVariableWidthVector::setNull,
+ Arrays.asList("", "foobar", "abc"));
+ }
+
+ @Test
+ void binary() throws SQLException {
+ testSimpleType(ArrowType.Binary.INSTANCE, Types.VARBINARY,
+ (VarBinaryVector vector, Integer index, byte[] value) ->
+ vector.setSafe(index, value),
+ BaseVariableWidthVector::setNull,
+ Arrays.asList(new byte[0], new byte[] {2, -4}, new byte[] {0, -1, 127, -128}));
+ }
+
+ @Test
+ void largeBinary() throws SQLException {
+ testSimpleType(ArrowType.LargeBinary.INSTANCE, Types.LONGVARBINARY,
+ (LargeVarBinaryVector vector, Integer index, byte[] value) ->
+ vector.setSafe(index, value),
+ BaseLargeVariableWidthVector::setNull,
+ Arrays.asList(new byte[0], new byte[] {2, -4}, new byte[] {0, -1, 127, -128}));
+ }
+
+ @Test
+ void fixedSizeBinary() throws SQLException {
+ testSimpleType(new ArrowType.FixedSizeBinary(3), Types.BINARY,
+ FixedSizeBinaryVector::setSafe, FixedSizeBinaryVector::setNull,
+ Arrays.asList(new byte[3], new byte[] {1, 2, -4}, new byte[] {-1, 127, -128}));
+ }
+
+ @Test
+ void decimal128() throws SQLException {
+ testSimpleType(new ArrowType.Decimal(/*precision*/ 12, /*scale*/3, 128), Types.DECIMAL,
+ DecimalVector::setSafe, DecimalVector::setNull,
+ Arrays.asList(new BigDecimal("120.429"), new BigDecimal("-10590.123"), new BigDecimal("0.000")));
+ }
+
+ @Test
+ void decimal256() throws SQLException {
+ testSimpleType(new ArrowType.Decimal(/*precision*/ 12, /*scale*/3, 256), Types.DECIMAL,
+ Decimal256Vector::setSafe, Decimal256Vector::setNull,
+ Arrays.asList(new BigDecimal("120.429"), new BigDecimal("-10590.123"), new BigDecimal("0.000")));
+ }
+
+ @FunctionalInterface
+ interface TriConsumer<T, U, V> {
+ void accept(T value1, U value2, V value3);
+ }
+
+ <T, V extends FieldVector> void testSimpleType(ArrowType arrowType, int jdbcType, TriConsumer<V, Integer, T> setValue,
+ BiConsumer<V, Integer> setNull, List<T> values) throws SQLException {
+ Schema schema = new Schema(Collections.singletonList(Field.nullable("field", arrowType)));
+ try (final MockPreparedStatement statement = new MockPreparedStatement();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root).bindAll().build();
+ assertThat(binder.next()).isFalse();
+
+ @SuppressWarnings("unchecked")
+ final V vector = (V) root.getVector(0);
+ final ColumnBinder columnBinder = ColumnBinder.forVector(vector);
+ assertThat(columnBinder.getJdbcType()).isEqualTo(jdbcType);
+
+ setValue.accept(vector, 0, values.get(0));
+ setValue.accept(vector, 1, values.get(1));
+ setNull.accept(vector, 2);
+ root.setRowCount(3);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isNull();
+ assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+ assertThat(binder.next()).isFalse();
+
+ binder.reset();
+
+ setNull.accept(vector, 0);
+ setValue.accept(vector, 1, values.get(2));
+ setValue.accept(vector, 2, values.get(0));
+ setValue.accept(vector, 3, values.get(2));
+ setValue.accept(vector, 4, values.get(1));
+ root.setRowCount(5);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isNull();
+ assertThat(statement.getParamType(1)).isEqualTo(jdbcType);
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+ assertThat(binder.next()).isFalse();
+ }
+
+ // Non-nullable (since some types have a specialized binder)
+ schema = new Schema(Collections.singletonList(Field.notNullable("field", arrowType)));
+ try (final MockPreparedStatement statement = new MockPreparedStatement();
+ final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final JdbcParameterBinder binder =
+ JdbcParameterBinder.builder(statement, root).bindAll().build();
+ assertThat(binder.next()).isFalse();
+
+ @SuppressWarnings("unchecked")
+ final V vector = (V) root.getVector(0);
+ setValue.accept(vector, 0, values.get(0));
+ setValue.accept(vector, 1, values.get(1));
+ root.setRowCount(2);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+ assertThat(binder.next()).isFalse();
+
+ binder.reset();
+
+ setValue.accept(vector, 0, values.get(0));
+ setValue.accept(vector, 1, values.get(2));
+ setValue.accept(vector, 2, values.get(0));
+ setValue.accept(vector, 3, values.get(2));
+ setValue.accept(vector, 4, values.get(1));
+ root.setRowCount(5);
+
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(0));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(2));
+ assertThat(binder.next()).isTrue();
+ assertThat(statement.getParamValue(1)).isEqualTo(values.get(1));
+ assertThat(binder.next()).isFalse();
+ }
+ }
+}
diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java
new file mode 100644
index 0000000000..438a949b73
--- /dev/null
+++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/MockPreparedStatement.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.jdbc;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.math.BigDecimal;
+import java.net.URL;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.NClob;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Ref;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.RowId;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A PreparedStatement that just stores parameters set on it. */
+public final class MockPreparedStatement implements PreparedStatement {
+ static class ParameterHolder {
+ final Object value;
+ final Integer sqlType;
+ public Calendar calendar;
+
+ ParameterHolder(Object value, Integer sqlType) {
+ this.value = value;
+ this.sqlType = sqlType;
+ }
+ }
+
+ private final Map<Integer, ParameterHolder> parameters;
+
+ MockPreparedStatement() {
+ parameters = new HashMap<>();
+ }
+
+ ParameterHolder getParam(int index) {
+ return parameters.get(index);
+ }
+
+ Object getParamValue(int index) {
+ return parameters.get(index).value;
+ }
+
+ Integer getParamType(int index) {
+ return parameters.get(index).sqlType;
+ }
+
+ @Override
+ public ResultSet executeQuery() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(null, sqlType));
+ }
+
+ @Override
+ public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setByte(int parameterIndex, byte x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setShort(int parameterIndex, short x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setInt(int parameterIndex, int x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setLong(int parameterIndex, long x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setFloat(int parameterIndex, float x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setDouble(int parameterIndex, double x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setString(int parameterIndex, String x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ @Deprecated
+ public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void clearParameters() throws SQLException {
+ parameters.clear();
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, targetSqlType));
+ }
+
+ @Override
+ public void setObject(int parameterIndex, Object x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addBatch() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, int length)
+ throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(reader, null));
+ }
+
+ @Override
+ public void setRef(int parameterIndex, Ref x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setBlob(int parameterIndex, Blob x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setClob(int parameterIndex, Clob x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setArray(int parameterIndex, Array x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public ResultSetMetaData getMetaData() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {}
+
+ @Override
+ public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+ ParameterHolder value = new ParameterHolder(x, null);
+ value.calendar = cal;
+ parameters.put(parameterIndex, value);
+ }
+
+ @Override
+ public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {}
+
+ @Override
+ public void setURL(int parameterIndex, URL x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setRowId(int parameterIndex, RowId x) throws SQLException {
+ parameters.put(parameterIndex, new ParameterHolder(x, null));
+ }
+
+ @Override
+ public void setNString(int parameterIndex, String value) throws SQLException {}
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value, long length)
+ throws SQLException {}
+
+ @Override
+ public void setNClob(int parameterIndex, NClob value) throws SQLException {}
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream, long length)
+ throws SQLException {}
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {}
+
+ @Override
+ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {}
+
+ @Override
+ public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength)
+ throws SQLException {}
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {}
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {}
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader, long length)
+ throws SQLException {}
+
+ @Override
+ public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {}
+
+ @Override
+ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {}
+
+ @Override
+ public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {}
+
+ @Override
+ public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {}
+
+ @Override
+ public void setClob(int parameterIndex, Reader reader) throws SQLException {}
+
+ @Override
+ public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {}
+
+ @Override
+ public void setNClob(int parameterIndex, Reader reader) throws SQLException {}
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws SQLException {}
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ throw new UnsupportedOperationException();
+ }
+}