You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/12 15:53:50 UTC
[arrow-adbc] branch main updated: Implement metadata methods for JDBC (#36)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new b5691cb Implement metadata methods for JDBC (#36)
b5691cb is described below
commit b5691cb3ca4091c93e11181f5a03d0e164a30322
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jul 12 11:53:46 2022 -0400
Implement metadata methods for JDBC (#36)
Fixes #33.
---
.../org/apache/arrow/adbc/core/AdbcConnection.java | 169 +++++++++-
.../apache/arrow/adbc/core/StandardSchemas.java | 133 ++++++++
.../arrow/adbc/driver/jdbc/FixedJdbcStatement.java | 80 +++++
.../arrow/adbc/driver/jdbc/JdbcArrowReader.java | 27 +-
.../arrow/adbc/driver/jdbc/JdbcConnection.java | 84 +++++
.../arrow/adbc/driver/jdbc/JdbcDriverUtil.java | 8 +-
.../adbc/driver/jdbc/JdbcMetadataBuilder.java | 341 +++++++++++++++++++++
.../arrow/adbc/driver/jdbc/JdbcStatement.java | 3 +-
.../arrow/adbc/driver/jdbc/RootArrowReader.java | 68 ++++
.../driver/jdbc/JdbcConnectionMetadataTest.java | 36 +++
.../testsuite/AbstractConnectionMetadataTest.java | 229 ++++++++++++++
java/pom.xml | 11 +-
12 files changed, 1170 insertions(+), 19 deletions(-)
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
index b92a086..2a1e2b8 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
@@ -17,6 +17,7 @@
package org.apache.arrow.adbc.core;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
/** A connection to a {@link AdbcDatabase}. */
public interface AdbcConnection extends AutoCloseable {
@@ -43,7 +44,173 @@ public interface AdbcConnection extends AutoCloseable {
* does not match the table schema.
*/
default AdbcStatement bulkIngest(String targetTableName) throws AdbcException {
- throw new UnsupportedOperationException("Connection does not support bulk ingestion");
+ throw new UnsupportedOperationException("Connection does not support bulkIngest(String)");
+ }
+
+ /**
+ * Get a hierarchical view of all catalogs, database schemas, tables, and columns.
+ *
+ * <p>The result is an Arrow dataset with the following schema:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> </tr>
+ * <tr><td>catalog_name</td> <td>utf8</td> </tr>
+ * <tr><td>catalog_db_schemas</td> <td>list[DB_SCHEMA_SCHEMA]</td> </tr>
+ * </table>
+ *
+ * DB_SCHEMA_SCHEMA is a Struct with fields:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> </tr>
+ * <tr><td>db_schema_name</td> <td>utf8</td> </tr>
+ * <tr><td>db_schema_tables</td> <td>list[TABLE_SCHEMA]</td> </tr>
+ * </table>
+ *
+ * TABLE_SCHEMA is a Struct with fields:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> </tr>
+ * <tr><td>table_name</td> <td>utf8 not null</td> </tr>
+ * <tr><td>table_type</td> <td>utf8 not null</td> </tr>
+ * <tr><td>table_columns</td> <td>list[COLUMN_SCHEMA]</td> </tr>
+ * <tr><td>table_constraints</td> <td>list[CONSTRAINT_SCHEMA]</td></tr>
+ * </table>
+ *
+ * COLUMN_SCHEMA is a Struct with fields:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> <th>Comments</th></tr>
+ * <tr><td>column_name</td> <td>utf8 not null</td> <td></td></tr>
+ * <tr><td>ordinal_position</td> <td>int32</td> <td>(1)</td></tr>
+ * <tr><td>remarks</td> <td>utf8</td> <td>(2)</td></tr>
+ * <tr><td>xdbc_data_type</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_type_name</td> <td>utf8 </td> <td>(3)</td></tr>
+ * <tr><td>xdbc_column_size</td> <td>int32</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_decimal_digits</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_num_prec_radix</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_nullable</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_column_def</td> <td>utf8</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_sql_data_type</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_datetime_sub</td> <td>int16</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_char_octet_length</td> <td>int32</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_is_nullable</td> <td>utf8</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_scope_catalog</td> <td>utf8</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_scope_schema</td> <td>utf8</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_scope_table</td> <td>utf8</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_is_autoincrement</td> <td>bool</td> <td>(3)</td></tr>
+ * <tr><td>xdbc_is_generatedcolumn</td> <td>bool</td> <td>(3)</td></tr>
+ * </table>
+ *
+ * Notes:
+ *
+ * <ol>
+ * <li>The column's ordinal position in the table (starting from 1).
+ * <li>Database-specific description of the column.
+ * <li>Optional value. Should be null if not supported by the driver. xdbc_ values are meant to
+ * provide JDBC/ODBC-compatible metadata in an agnostic manner.
+ * </ol>
+ *
+ * CONSTRAINT_SCHEMA is a Struct with fields:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> <th>Comments</th></tr>
+ * <tr><td>constraint_name</td> <td>utf8 not null</td> <td></td></tr>
+ * <tr><td>constraint_type</td> <td>utf8 not null</td> <td>(1)</td></tr>
+ * <tr><td>constraint_column_names</td> <td>list[utf8] not null</td> <td>(2)</td></tr>
+ * <tr><td>constraint_column_usage</td> <td>list[USAGE_SCHEMA]</td> <td>(3)</td></tr>
+ * </table>
+ *
+ * <ol>
+ * <li>One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
+ * <li>The columns on the current table that are constrained, in order.
+ * <li>For FOREIGN KEY only, the referenced table and columns.
+ * </ol>
+ *
+ * USAGE_SCHEMA is a Struct with fields:
+ *
+ * <table border="1">
+ * <tr><th>Field Name</th> <th>Field Type</th> </tr>
+ * <tr><td>fk_catalog</td> <td>utf8</td> </tr>
+ * <tr><td>fk_db_schema</td> <td>utf8</td> </tr>
+ * <tr><td>fk_table</td> <td>utf8 not null</td> </tr>
+ * <tr><td>fk_column_name</td> <td>utf8 not null</td> </tr>
+ * </table>
+ *
+ * @param depth The level of nesting to display. If ALL, display all levels (up through columns).
+ * If CATALOGS, display only catalogs (i.e. catalog_schemas will be null), and so on. May be a
+ * search pattern (see class documentation).
+ * @param catalogPattern Only show tables in the given catalog. If null, do not filter by catalog.
+ * If an empty string, only show tables without a catalog. May be a search pattern (see class
+ * documentation).
+ * @param dbSchemaPattern Only show tables in the given database schema. If null, do not filter by
+ * database schema. If an empty string, only show tables without a database schema. May be a
+ * search pattern (see class documentation).
+ * @param tableNamePattern Only show tables with the given name. If an empty string, only show
+ * tables without a catalog. May be a search pattern (see class documentation).
+ * @param tableTypes Only show tables matching one of the given table types. If null, show tables
+ * of any type. Valid table types can be fetched from {@link #getTableTypes()}.
+ * @param columnNamePattern Only show columns with the given name. If null, do not filter by name.
+ * May be a search pattern (see class documentation).
+ */
+ default AdbcStatement getObjects(
+ GetObjectsDepth depth,
+ String catalogPattern,
+ String dbSchemaPattern,
+ String tableNamePattern,
+ String[] tableTypes,
+ String columnNamePattern)
+ throws AdbcException {
+ throw new UnsupportedOperationException(
+ "Connection does not support getTableSchema(String, String, String)");
+ }
+
+ /**
+ * The level of nesting to retrieve for {@link #getObjects(GetObjectsDepth, String, String,
+ * String, String[], String)}.
+ */
+ enum GetObjectsDepth {
+ /** Display ALL objects (catalog, database schemas, tables, and columns). */
+ ALL,
+ /** Display only catalogs. */
+ CATALOGS,
+ /** Display catalogs and database schemas. */
+ DB_SCHEMAS,
+ /** Display catalogs, database schemas, and tables. */
+ TABLES,
+ }
+
+ /**
+ * Get the Arrow schema of a database table.
+ *
+ * @param catalog The catalog of the table (or null).
+ * @param dbSchema The database schema of the table (or null).
+ * @param tableName The table name.
+ * @return The table schema.
+ */
+ default Schema getTableSchema(String catalog, String dbSchema, String tableName)
+ throws AdbcException {
+ throw new UnsupportedOperationException(
+ "Connection does not support getTableSchema(String, String, String)");
+ }
+
+ /**
+ * Get a list of table types supported by the database.
+ *
+ * <p>The result is an Arrow dataset with the following schema:
+ *
+ * <table border="1">
+ * <tr>
+ * <th>Field Name</th>
+ * <th>Field Type</th>
+ * </tr>
+ * <tr>
+ * <td>table_type</td>
+ * <td>utf8 not null</td>
+ * </tr>
+ * </table>
+ */
+ default AdbcStatement getTableTypes() throws AdbcException {
+ throw new UnsupportedOperationException("Connection does not support getTableTypes()");
}
/**
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java b/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java
new file mode 100644
index 0000000..af35cbb
--- /dev/null
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.vector.complex.BaseRepeatedValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public final class StandardSchemas {
+ private StandardSchemas() {
+ throw new AssertionError("Do not instantiate this class");
+ }
+
+ private static final ArrowType INT16 = new ArrowType.Int(16, true);
+ private static final ArrowType INT32 = new ArrowType.Int(32, true);
+
+ /** The schema of the result set of {@link AdbcConnection#getTableTypes()}. */
+ public static final Schema TABLE_TYPES_SCHEMA =
+ new Schema(
+ Collections.singletonList(Field.notNullable("table_type", ArrowType.Utf8.INSTANCE)));
+
+ public static final List<Field> USAGE_SCHEMA =
+ Arrays.asList(
+ Field.nullable("fk_catalog", ArrowType.Utf8.INSTANCE),
+ Field.nullable("fk_db_schema", ArrowType.Utf8.INSTANCE),
+ Field.notNullable("fk_table", ArrowType.Utf8.INSTANCE),
+ Field.notNullable("fk_column_name", ArrowType.Utf8.INSTANCE));
+
+ public static final List<Field> CONSTRAINT_SCHEMA =
+ Arrays.asList(
+ Field.notNullable("constraint_name", ArrowType.Utf8.INSTANCE),
+ Field.notNullable("constraint_type", ArrowType.Utf8.INSTANCE),
+ new Field(
+ "constraint_column_names",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ Field.nullable(BaseRepeatedValueVector.DATA_VECTOR_NAME, new ArrowType.Utf8()))),
+ new Field(
+ "constraint_column_usage",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ new Field(
+ BaseRepeatedValueVector.DATA_VECTOR_NAME,
+ FieldType.nullable(ArrowType.Struct.INSTANCE),
+ USAGE_SCHEMA))));
+
+ public static final List<Field> COLUMN_SCHEMA =
+ Arrays.asList(
+ new Field("column_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("ordinal_position", FieldType.nullable(INT32), null),
+ new Field("remarks", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_data_type", FieldType.nullable(INT16), null),
+ new Field("xdbc_type_name", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_column_size", FieldType.nullable(INT32), null),
+ new Field("xdbc_decimal_digits", FieldType.nullable(INT16), null),
+ new Field("xdbc_num_prec_radix", FieldType.nullable(INT16), null),
+ new Field("xdbc_nullable", FieldType.nullable(INT16), null),
+ new Field("xdbc_column_def", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_sql_data_type", FieldType.nullable(INT16), null),
+ new Field("xdbc_datetime_sub", FieldType.nullable(INT16), null),
+ new Field("xdbc_char_octet_length", FieldType.nullable(INT32), null),
+ new Field("xdbc_is_nullable", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_scope_catalog", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_scope_schema", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_scope_table", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("xdbc_is_autoincrement", FieldType.nullable(ArrowType.Bool.INSTANCE), null),
+ new Field("xdbc_is_generatedcolumn", FieldType.nullable(ArrowType.Bool.INSTANCE), null));
+
+ public static final List<Field> TABLE_SCHEMA =
+ Arrays.asList(
+ new Field("table_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+ new Field("table_type", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+ new Field(
+ "table_columns",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ new Field(
+ BaseRepeatedValueVector.DATA_VECTOR_NAME,
+ FieldType.nullable(ArrowType.Struct.INSTANCE),
+ COLUMN_SCHEMA))),
+ new Field(
+ "table_constraints",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ new Field(
+ BaseRepeatedValueVector.DATA_VECTOR_NAME,
+ FieldType.nullable(ArrowType.Struct.INSTANCE),
+ CONSTRAINT_SCHEMA))));
+
+ public static final List<Field> DB_SCHEMA_SCHEMA =
+ Arrays.asList(
+ new Field("db_schema_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+ new Field(
+ "db_schema_tables",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ new Field(
+ BaseRepeatedValueVector.DATA_VECTOR_NAME,
+ FieldType.nullable(ArrowType.Struct.INSTANCE),
+ TABLE_SCHEMA))));
+
+ public static final Schema GET_OBJECTS_SCHEMA =
+ new Schema(
+ Arrays.asList(
+ new Field("catalog_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+ new Field(
+ "catalog_db_schemas",
+ FieldType.notNullable(ArrowType.List.INSTANCE),
+ Collections.singletonList(
+ new Field(
+ BaseRepeatedValueVector.DATA_VECTOR_NAME,
+ FieldType.nullable(ArrowType.Struct.INSTANCE),
+ DB_SCHEMA_SCHEMA)))));
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java
new file mode 100644
index 0000000..feae265
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.sql.ResultSet;
+import java.util.Collections;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An AdbcStatement implementation that returns a fixed ResultSet. */
+class FixedJdbcStatement implements AdbcStatement {
+ private final BufferAllocator allocator;
+ private final Schema overrideSchema;
+ private ResultSet resultSet;
+ private ArrowRecordBatch recordBatch;
+
+ public FixedJdbcStatement(BufferAllocator allocator, ResultSet resultSet, Schema overrideSchema) {
+ this.allocator = allocator;
+ this.resultSet = resultSet;
+ this.overrideSchema = overrideSchema;
+ }
+
+ public FixedJdbcStatement(BufferAllocator allocator, VectorSchemaRoot root) {
+ this.allocator = allocator;
+ this.overrideSchema = root.getSchema();
+ // Unload the root to preserve the data
+ recordBatch = new VectorUnloader(root).getRecordBatch();
+ }
+
+ @Override
+ public void execute() throws AdbcException {
+ throw new UnsupportedOperationException("Cannot execute() this statement.");
+ }
+
+ @Override
+ public ArrowReader getArrowReader() throws AdbcException {
+ final ArrowReader reader;
+ if (resultSet != null) {
+ reader = new JdbcArrowReader(allocator, resultSet, overrideSchema);
+ resultSet = null;
+ } else {
+ reader =
+ new RootArrowReader(allocator, overrideSchema, Collections.singletonList(recordBatch));
+ recordBatch = null;
+ }
+ return reader;
+ }
+
+ @Override
+ public void prepare() throws AdbcException {
+ throw new UnsupportedOperationException("Cannot prepare() this statement.");
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(resultSet, recordBatch);
+ }
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
index f4b8e73..4445729 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
@@ -33,30 +33,37 @@ import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
+/** An ArrowReader that wraps a JDBC ResultSet. */
public class JdbcArrowReader extends ArrowReader {
private final ArrowVectorIterator delegate;
private final Schema schema;
private long bytesRead;
- JdbcArrowReader(BufferAllocator allocator, ResultSet resultSet) throws AdbcException {
+ JdbcArrowReader(BufferAllocator allocator, ResultSet resultSet, Schema overrideSchema)
+ throws AdbcException {
super(allocator);
- try {
- this.delegate = JdbcToArrow.sqlToArrowVectorIterator(resultSet, allocator);
- } catch (SQLException e) {
- throw JdbcDriverUtil.fromSqlException(e);
- } catch (IOException e) {
- throw new AdbcException(
- JdbcDriverUtil.prefixExceptionMessage(e.getMessage()), e, AdbcStatusCode.IO, null, -1);
- }
final JdbcToArrowConfig config =
new JdbcToArrowConfigBuilder()
.setAllocator(allocator)
.setCalendar(JdbcToArrowUtils.getUtcCalendar())
+ .setTargetBatchSize(1024)
.build();
try {
- this.schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config);
+ this.delegate = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config);
} catch (SQLException e) {
throw JdbcDriverUtil.fromSqlException(e);
+ } catch (IOException e) {
+ throw new AdbcException(
+ JdbcDriverUtil.prefixExceptionMessage(e.getMessage()), e, AdbcStatusCode.IO, null, -1);
+ }
+ if (overrideSchema != null) {
+ this.schema = overrideSchema;
+ } else {
+ try {
+ this.schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config);
+ } catch (SQLException e) {
+ throw JdbcDriverUtil.fromSqlException("Failed to convert JDBC schema to Arrow schema:", e);
+ }
}
this.bytesRead = 0;
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
index 71554f9..9559c30 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
@@ -17,11 +17,23 @@
package org.apache.arrow.adbc.driver.jdbc;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.adbc.core.AdbcConnection;
import org.apache.arrow.adbc.core.AdbcException;
import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.StandardSchemas;
import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
public class JdbcConnection implements AdbcConnection {
private final BufferAllocator allocator;
@@ -52,6 +64,78 @@ public class JdbcConnection implements AdbcConnection {
return JdbcStatement.ingestRoot(allocator, connection, targetTableName);
}
+ @Override
+ public AdbcStatement getObjects(
+ final GetObjectsDepth depth,
+ final String catalogPattern,
+ final String dbSchemaPattern,
+ final String tableNamePattern,
+ final String[] tableTypes,
+ final String columnNamePattern)
+ throws AdbcException {
+ // Build up the metadata in-memory and then return a constant reader.
+ try {
+ final VectorSchemaRoot root =
+ new JdbcMetadataBuilder(
+ allocator,
+ connection,
+ depth,
+ catalogPattern,
+ dbSchemaPattern,
+ tableNamePattern,
+ tableTypes,
+ columnNamePattern)
+ .build();
+ return new FixedJdbcStatement(allocator, root);
+ } catch (SQLException e) {
+ throw JdbcDriverUtil.fromSqlException(e);
+ }
+ }
+
+ @Override
+ public Schema getTableSchema(String catalog, String dbSchema, String tableName)
+ throws AdbcException {
+ // Reconstruct the schema from the metadata
+ // XXX: this may be inconsistent with reading the table
+ final List<Field> fields = new ArrayList<>();
+ try (final ResultSet rs =
+ connection
+ .getMetaData()
+ .getColumns(catalog, dbSchema, tableName, /*columnNamePattern*/ null)) {
+ while (rs.next()) {
+ final String fieldName = rs.getString("COLUMN_NAME");
+ final boolean nullable = rs.getInt("NULLABLE") != DatabaseMetaData.columnNoNulls;
+ final int jdbcType = rs.getInt("DATA_TYPE");
+ final int precision = rs.getInt("COLUMN_SIZE");
+ final int scale = rs.getInt("DECIMAL_DIGITS");
+ final ArrowType arrowType =
+ JdbcToArrowUtils.getArrowTypeFromJdbcType(
+ new JdbcFieldInfo(jdbcType, precision, scale), /*calendar*/ null);
+ final Field field =
+ new Field(
+ fieldName,
+ new FieldType(
+ nullable, arrowType, /*dictionary*/ null, /*metadata*/ null), /*children*/
+ null);
+ fields.add(field);
+ }
+ } catch (SQLException e) {
+ throw JdbcDriverUtil.fromSqlException(e);
+ }
+ return new Schema(fields);
+ }
+
+ @Override
+ public AdbcStatement getTableTypes() throws AdbcException {
+ // XXX: this is nonconforming since we don't currently have a way to rename the columns
+ try {
+ return new FixedJdbcStatement(
+ allocator, connection.getMetaData().getTableTypes(), StandardSchemas.TABLE_TYPES_SCHEMA);
+ } catch (SQLException e) {
+ throw JdbcDriverUtil.fromSqlException(e);
+ }
+ }
+
@Override
public void rollback() throws AdbcException {
try {
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
index 1a9a6a9..4926877 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
@@ -44,11 +44,7 @@ final class JdbcDriverUtil {
static AdbcException fromSqlException(
AdbcStatusCode status, String format, SQLException e, Object... values) {
- return new AdbcException(
- String.format(format, values) + prefixExceptionMessage(e.getMessage()),
- e.getCause(),
- status,
- e.getSQLState(),
- e.getErrorCode());
+ final String message = "[JDBC] " + String.format(format, values) + e.getMessage();
+ return new AdbcException(message, e.getCause(), status, e.getSQLState(), e.getErrorCode());
}
}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java
new file mode 100644
index 0000000..1d2984b
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+
+/** Helper class to track state needed to build up the metadata structure. */
+final class JdbcMetadataBuilder implements AutoCloseable {
+ private final AdbcConnection.GetObjectsDepth depth;
+ private final String catalogPattern;
+ private final String dbSchemaPattern;
+ private final String tableNamePattern;
+ private final String[] tableTypesFilter;
+ private final String columnNamePattern;
+ private final DatabaseMetaData dbmd;
+ private VectorSchemaRoot root;
+
+ final VarCharVector catalogNames;
+ final ListVector catalogDbSchemas;
+ final StructVector dbSchemas;
+ final VarCharVector dbSchemaNames;
+ final ListVector dbSchemaTables;
+ final StructVector tables;
+ final VarCharVector tableNames;
+ final VarCharVector tableTypes;
+ final ListVector tableColumns;
+ final StructVector columns;
+ final VarCharVector columnNames;
+ final IntVector columnOrdinalPositions;
+ final VarCharVector columnRemarks;
+ final SmallIntVector columnXdbcDataTypes;
+ final ListVector tableConstraints;
+ final StructVector constraints;
+ final VarCharVector constraintNames;
+ final VarCharVector constraintTypes;
+ final ListVector constraintColumnNames;
+ final VarCharVector constraintColumnNameItems;
+ final ListVector constraintColumnUsage;
+ final StructVector columnUsages;
+ final VarCharVector columnUsageFkCatalogs;
+ final VarCharVector columnUsageFkDbSchemas;
+ final VarCharVector columnUsageFkTables;
+ final VarCharVector columnUsageFkColumns;
+
+ JdbcMetadataBuilder(
+ BufferAllocator allocator,
+ Connection connection,
+ final AdbcConnection.GetObjectsDepth depth,
+ final String catalogPattern,
+ final String dbSchemaPattern,
+ final String tableNamePattern,
+ final String[] tableTypesFilter,
+ final String columnNamePattern)
+ throws SQLException {
+ this.depth = depth;
+ this.catalogPattern = catalogPattern;
+ this.dbSchemaPattern = dbSchemaPattern;
+ this.tableNamePattern = tableNamePattern;
+ this.tableTypesFilter = tableTypesFilter;
+ this.columnNamePattern = columnNamePattern;
+ this.root = VectorSchemaRoot.create(StandardSchemas.GET_OBJECTS_SCHEMA, allocator);
+ this.dbmd = connection.getMetaData();
+ this.catalogNames = (VarCharVector) root.getVector(0);
+ this.catalogDbSchemas = (ListVector) root.getVector(1);
+ this.dbSchemas = (StructVector) catalogDbSchemas.getDataVector();
+ this.dbSchemaNames = (VarCharVector) dbSchemas.getVectorById(0);
+ this.dbSchemaTables = (ListVector) dbSchemas.getVectorById(1);
+ this.tables = (StructVector) dbSchemaTables.getDataVector();
+ this.tableNames = (VarCharVector) tables.getVectorById(0);
+ this.tableTypes = (VarCharVector) tables.getVectorById(1);
+ this.tableColumns = (ListVector) tables.getVectorById(2);
+ this.columns = (StructVector) tableColumns.getDataVector();
+ this.columnNames = (VarCharVector) columns.getVectorById(0);
+ this.columnOrdinalPositions = (IntVector) columns.getVectorById(1);
+ this.columnRemarks = (VarCharVector) columns.getVectorById(2);
+ this.columnXdbcDataTypes = (SmallIntVector) columns.getVectorById(3);
+ this.tableConstraints = (ListVector) tables.getVectorById(3);
+ this.constraints = (StructVector) tableConstraints.getDataVector();
+ this.constraintNames = (VarCharVector) constraints.getVectorById(0);
+ this.constraintTypes = (VarCharVector) constraints.getVectorById(1);
+ this.constraintColumnNames = (ListVector) constraints.getVectorById(2);
+ this.constraintColumnNameItems = (VarCharVector) constraintColumnNames.getDataVector();
+ this.constraintColumnUsage = (ListVector) constraints.getVectorById(3);
+ this.columnUsages = (StructVector) constraintColumnUsage.getDataVector();
+ this.columnUsageFkCatalogs = (VarCharVector) columnUsages.getVectorById(0);
+ this.columnUsageFkDbSchemas = (VarCharVector) columnUsages.getVectorById(1);
+ this.columnUsageFkTables = (VarCharVector) columnUsages.getVectorById(2);
+ this.columnUsageFkColumns = (VarCharVector) columnUsages.getVectorById(3);
+ }
+
+ VectorSchemaRoot build() throws SQLException {
+ // TODO: need to turn catalogPattern into a catalog filter since JDBC doesn't support this
+ try (final ResultSet rs = dbmd.getCatalogs()) {
+ int catalogCount = 0;
+ while (rs.next()) {
+ final String catalogName = rs.getString(1);
+ addCatalogRow(catalogCount, catalogName);
+ catalogCount++;
+ }
+ // TODO: only include this if matches filter
+ addCatalogRow(catalogCount, /*catalogName*/ "");
+ catalogCount++;
+ root.setRowCount(catalogCount);
+ }
+ VectorSchemaRoot result = root;
+ root = null;
+ return result;
+ }
+
+ private void addCatalogRow(int rowIndex, String catalogName) throws SQLException {
+ catalogNames.setSafe(rowIndex, catalogName.getBytes(StandardCharsets.UTF_8));
+ if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) {
+ catalogDbSchemas.setNull(rowIndex);
+ } else {
+ int dbSchemasBaseIndex = catalogDbSchemas.startNewValue(rowIndex);
+ final int dbSchemaCount = buildDbSchemas(dbSchemasBaseIndex, catalogName);
+ catalogDbSchemas.endValue(rowIndex, dbSchemaCount);
+ }
+ }
+
+ private int buildDbSchemas(int rowIndex, String catalogName) throws SQLException {
+ int dbSchemaCount = 0;
+ // TODO: get tables with no schema
+ try (final ResultSet rs = dbmd.getSchemas(catalogName, dbSchemaPattern)) {
+ while (rs.next()) {
+ final String dbSchemaName = rs.getString(1);
+ addDbSchemaRow(rowIndex + dbSchemaCount, catalogName, dbSchemaName);
+ dbSchemaCount++;
+ }
+ }
+ return dbSchemaCount;
+ }
+
+ private void addDbSchemaRow(int rowIndex, String catalogName, String dbSchemaName)
+ throws SQLException {
+ dbSchemas.setIndexDefined(rowIndex);
+ dbSchemaNames.setSafe(rowIndex, dbSchemaName.getBytes(StandardCharsets.UTF_8));
+ if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) {
+ dbSchemaTables.setNull(rowIndex);
+ } else {
+ int tableBaseIndex = dbSchemaTables.startNewValue(rowIndex);
+ final int tableCount = buildTables(tableBaseIndex, catalogName, dbSchemaName);
+ dbSchemaTables.endValue(rowIndex, tableCount);
+ }
+ }
+
+ private int buildTables(int rowIndex, String catalogName, String dbSchemaName)
+ throws SQLException {
+ int tableCount = 0;
+ try (final ResultSet rs =
+ dbmd.getTables(catalogName, dbSchemaName, tableNamePattern, tableTypesFilter)) {
+ while (rs.next()) {
+ final String tableName = rs.getString(3);
+ final String tableType = rs.getString(4);
+ tables.setIndexDefined(rowIndex + tableCount);
+ tableNames.setSafe(rowIndex + tableCount, tableName.getBytes(StandardCharsets.UTF_8));
+ tableTypes.setSafe(rowIndex + tableCount, tableType.getBytes(StandardCharsets.UTF_8));
+ final int constraintOffset = tableConstraints.startNewValue(rowIndex + tableCount);
+ int constraintCount = 0;
+ // JDBC doesn't directly expose constraints. Merge various info methods:
+ // 1. Primary keys
+ try (final ResultSet pk = dbmd.getPrimaryKeys(catalogName, dbSchemaName, tableName)) {
+ String constraintName = null;
+ List<String> constraintColumns = new ArrayList<>();
+ if (pk.next()) {
+ while (pk.next()) {
+ constraintName = pk.getString(6);
+ String columnName = pk.getString(4);
+ int columnIndex = pk.getInt(5);
+ while (constraintColumns.size() < columnIndex) constraintColumns.add(null);
+ constraintColumns.set(columnIndex - 1, columnName);
+ }
+ addConstraint(
+ constraintOffset + constraintCount,
+ constraintName,
+ "PRIMARY KEY",
+ constraintColumns,
+ Collections.emptyList());
+ constraintCount++;
+ }
+ }
+ // 2. Foreign keys ("imported" keys)
+ try (final ResultSet fk = dbmd.getImportedKeys(catalogName, dbSchemaName, tableName)) {
+ List<String> names = new ArrayList<>();
+ List<List<String>> columns = new ArrayList<>();
+ List<List<ReferencedColumn>> references = new ArrayList<>();
+ while (fk.next()) {
+ String keyName = fk.getString(12);
+ String keyColumn = fk.getString(8);
+ int keySeq = fk.getInt(9);
+ if (keySeq == 1) {
+ names.add(keyName);
+ columns.add(new ArrayList<>());
+ references.add(new ArrayList<>());
+ }
+ columns.get(columns.size() - 1).add(keyColumn);
+ final ReferencedColumn reference = new ReferencedColumn();
+ reference.catalog = fk.getString(1);
+ reference.dbSchema = fk.getString(2);
+ reference.table = fk.getString(3);
+ reference.column = fk.getString(4);
+ references.get(references.size() - 1).add(reference);
+ }
+
+ for (int i = 0; i < names.size(); i++) {
+ addConstraint(
+ constraintOffset + constraintCount,
+ names.get(i),
+ "FOREIGN KEY",
+ columns.get(i),
+ references.get(i));
+ constraintCount++;
+ }
+ }
+
+ // TODO: UNIQUE constraints are exposed under indices
+ // TODO: how to get CHECK constraints?
+
+ tableConstraints.endValue(rowIndex + tableCount, constraintCount);
+ if (depth == AdbcConnection.GetObjectsDepth.TABLES) {
+ tableColumns.setNull(rowIndex + tableCount);
+ } else {
+ int columnBaseIndex = tableColumns.startNewValue(rowIndex);
+ final int columnCount =
+ buildColumns(columnBaseIndex, catalogName, dbSchemaName, tableName);
+ tableColumns.endValue(rowIndex, columnCount);
+ }
+ tableCount++;
+ }
+ }
+ return tableCount;
+ }
+
+ private int buildColumns(int rowIndex, String catalogName, String dbSchemaName, String tableName)
+ throws SQLException {
+ int columnCount = 0;
+ try (final ResultSet rs =
+ dbmd.getColumns(catalogName, dbSchemaName, tableName, columnNamePattern)) {
+ while (rs.next()) {
+ final String columnName = rs.getString(4);
+ final int ordinalPosition = rs.getInt(17);
+ final String remarks = rs.getString(12);
+ final int xdbcDataType = rs.getInt(5);
+ // TODO: other JDBC metadata
+
+ columns.setIndexDefined(rowIndex + columnCount);
+ columnNames.setSafe(rowIndex + columnCount, columnName.getBytes(StandardCharsets.UTF_8));
+ columnOrdinalPositions.setSafe(rowIndex + columnCount, ordinalPosition);
+ if (remarks != null) {
+ columnRemarks.setSafe(rowIndex + columnCount, remarks.getBytes(StandardCharsets.UTF_8));
+ }
+ columnXdbcDataTypes.setSafe(rowIndex + columnCount, xdbcDataType);
+
+ columnCount++;
+ }
+ }
+ return columnCount;
+ }
+
+ private void addConstraint(
+ int index,
+ String constraintName,
+ String constraintType,
+ List<String> constraintColumns,
+ List<ReferencedColumn> referencedColumns) {
+ if (constraintName == null) {
+ constraintNames.setNull(index);
+ } else {
+ constraintNames.setSafe(index, constraintName.getBytes(StandardCharsets.UTF_8));
+ }
+ constraintTypes.setSafe(index, constraintType.getBytes(StandardCharsets.UTF_8));
+
+ int namesOffset = constraintColumnNames.startNewValue(index);
+ for (final String column : constraintColumns) {
+ constraintColumnNameItems.setSafe(namesOffset++, column.getBytes(StandardCharsets.UTF_8));
+ }
+ constraintColumnNames.endValue(index, constraintColumns.size());
+ int usageOffset = constraintColumnUsage.startNewValue(index);
+ for (final ReferencedColumn column : referencedColumns) {
+ columnUsages.setIndexDefined(usageOffset);
+ if (column.catalog == null) {
+ columnUsageFkCatalogs.setNull(usageOffset);
+ } else {
+ columnUsageFkCatalogs.setSafe(usageOffset, column.catalog.getBytes(StandardCharsets.UTF_8));
+ }
+ if (column.dbSchema == null) {
+ columnUsageFkDbSchemas.setNull(usageOffset);
+ } else {
+ columnUsageFkDbSchemas.setSafe(
+ usageOffset, column.dbSchema.getBytes(StandardCharsets.UTF_8));
+ }
+ columnUsageFkTables.setSafe(usageOffset, column.table.getBytes(StandardCharsets.UTF_8));
+ columnUsageFkColumns.setSafe(usageOffset, column.column.getBytes(StandardCharsets.UTF_8));
+ usageOffset++;
+ }
+ constraintColumnUsage.endValue(index, referencedColumns.size());
+ }
+
+ @Override
+ public void close() throws Exception {
+ AutoCloseables.close(root);
+ }
+
+ static class ReferencedColumn {
+ String catalog;
+ String dbSchema;
+ String table;
+ String column;
+ }
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
index b95f9b8..bf5c0db 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
@@ -212,7 +212,8 @@ public class JdbcStatement implements AdbcStatement {
if (resultSet == null) {
throw new IllegalStateException("Must call execute() before getArrowIterator()");
}
- final JdbcArrowReader reader = new JdbcArrowReader(allocator, resultSet);
+ final JdbcArrowReader reader =
+ new JdbcArrowReader(allocator, resultSet, /*overrideSchema*/ null);
resultSet = null;
return reader;
}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java
new file mode 100644
index 0000000..aadba47
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An ArrowReader that wraps a list of ArrowRecordBatches. */
+class RootArrowReader extends ArrowReader {
+ private final Schema schema;
+ private final List<ArrowRecordBatch> batches;
+ int nextIndex;
+
+ public RootArrowReader(BufferAllocator allocator, Schema schema, List<ArrowRecordBatch> batches) {
+ super(allocator);
+ this.schema = schema;
+ this.batches = batches;
+ this.nextIndex = 0;
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ if (nextIndex < batches.size()) {
+ new VectorLoader(getVectorSchemaRoot()).load(batches.get(nextIndex++));
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public long bytesRead() {
+ return 0;
+ }
+
+ @Override
+ protected void closeReadSource() throws IOException {
+ try {
+ AutoCloseables.close(batches);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected Schema readSchema() {
+ return schema;
+ }
+}
diff --git a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
new file mode 100644
index 0000000..024223d
--- /dev/null
+++ b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionMetadataTest;
+import org.junit.jupiter.api.io.TempDir;
+
+public class JdbcConnectionMetadataTest extends AbstractConnectionMetadataTest {
+ @TempDir Path tempDir;
+
+ @Override
+ protected AdbcDatabase init() throws AdbcException {
+ final Map<String, String> parameters = new HashMap<>();
+ parameters.put("path", tempDir.toString() + "/db;create=true");
+ return JdbcDriver.INSTANCE.connect(parameters);
+ }
+}
diff --git a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
new file mode 100644
index 0000000..bba9aac
--- /dev/null
+++ b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.testsuite;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public abstract class AbstractConnectionMetadataTest {
+ protected AdbcDatabase database;
+ protected AdbcConnection connection;
+ protected BufferAllocator allocator;
+
+ protected abstract AdbcDatabase init() throws AdbcException;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ database = init();
+ connection = database.connect();
+ allocator = new RootAllocator();
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ AutoCloseables.close(connection, database, allocator);
+ }
+
+ @Test
+ void getObjectsColumns() throws Exception {
+ loadTable();
+ boolean tableFound = false;
+ try (final AdbcStatement stmt =
+ connection.getObjects(AdbcConnection.GetObjectsDepth.ALL, null, null, null, null, null)) {
+ try (final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.getVectorSchemaRoot().getSchema())
+ .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+ assertThat(reader.loadNextBatch()).isTrue();
+
+ final ListVector dbSchemas = (ListVector) reader.getVectorSchemaRoot().getVector(1);
+ final ListVector dbSchemaTables =
+ (ListVector) ((StructVector) dbSchemas.getDataVector()).getVectorById(1);
+ final StructVector tables = (StructVector) dbSchemaTables.getDataVector();
+ final VarCharVector tableNames = (VarCharVector) tables.getVectorById(0);
+ final ListVector tableColumns = (ListVector) tables.getVectorById(2);
+
+ for (int i = 0; i < tables.getValueCount(); i++) {
+ if (tables.isNull(i)) {
+ continue;
+ }
+ final Text tableName = tableNames.getObject(i);
+ if (tableName != null && tableName.toString().equals("FOO")) {
+ tableFound = true;
+ @SuppressWarnings("unchecked")
+ final List<Map<String, ?>> columns = (List<Map<String, ?>>) tableColumns.getObject(i);
+ assertThat(columns)
+ .extracting("column_name")
+ .containsExactlyInAnyOrder(new Text("INTS"), new Text("STRS"));
+ assertThat(columns).extracting("ordinal_position").containsExactlyInAnyOrder(1, 2);
+ }
+ }
+ }
+ }
+ assertThat(tableFound).describedAs("Table FOO exists in metadata").isTrue();
+ }
+
+ @Test
+ void getObjectsCatalogs() throws Exception {
+ loadTable();
+ try (final AdbcStatement stmt =
+ connection.getObjects(
+ AdbcConnection.GetObjectsDepth.CATALOGS, null, null, null, null, null)) {
+ try (final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.getVectorSchemaRoot().getSchema())
+ .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+ assertThat(reader.loadNextBatch()).isTrue();
+ assertThat(reader.getVectorSchemaRoot().getRowCount()).isGreaterThan(0);
+ final FieldVector dbSchemas = reader.getVectorSchemaRoot().getVector(1);
+ // We requested depth == CATALOGS, so the db_schemas field should be all null
+ assertThat(dbSchemas.getNullCount()).isEqualTo(dbSchemas.getValueCount());
+ }
+ }
+ }
+
+ @Test
+ void getObjectsDbSchemas() throws Exception {
+ loadTable();
+ try (final AdbcStatement stmt =
+ connection.getObjects(
+ AdbcConnection.GetObjectsDepth.DB_SCHEMAS, null, null, null, null, null)) {
+ try (final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.getVectorSchemaRoot().getSchema())
+ .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+ assertThat(reader.loadNextBatch()).isTrue();
+ assertThat(reader.getVectorSchemaRoot().getRowCount()).isGreaterThan(0);
+ }
+ }
+ }
+
+ @Test
+ void getObjectsTables() throws Exception {
+ loadTable();
+ try (final AdbcStatement stmt =
+ connection.getObjects(
+ AdbcConnection.GetObjectsDepth.TABLES, null, null, null, null, null)) {
+ try (final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.getVectorSchemaRoot().getSchema())
+ .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+ assertThat(reader.loadNextBatch()).isTrue();
+
+ final ListVector dbSchemas = (ListVector) reader.getVectorSchemaRoot().getVector(1);
+ final ListVector dbSchemaTables =
+ (ListVector) ((StructVector) dbSchemas.getDataVector()).getVectorById(1);
+ final StructVector tables = (StructVector) dbSchemaTables.getDataVector();
+ final VarCharVector tableNames = (VarCharVector) tables.getVectorById(0);
+ assertThat(IntStream.range(0, tableNames.getValueCount()).mapToObj(tableNames::getObject))
+ .contains(new Text("FOO"));
+ }
+ }
+ }
+
+ @Test
+ void getTableSchema() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/ true)),
+ Field.nullable("STRS", new ArrowType.Utf8())));
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ stmt.bind(root);
+ stmt.execute();
+ }
+ }
+ assertThat(connection.getTableSchema(/*catalog*/ null, /*dbSchema*/ null, "FOO"))
+ .isEqualTo(schema);
+ }
+
+ @Test
+ void getTableTypes() throws Exception {
+ try (final AdbcStatement stmt = connection.getTableTypes()) {
+ try (final ArrowReader reader = stmt.getArrowReader()) {
+ assertThat(reader.getVectorSchemaRoot().getSchema())
+ .isEqualTo(StandardSchemas.TABLE_TYPES_SCHEMA);
+ List<String> tableTypes = new ArrayList<>();
+ while (reader.loadNextBatch()) {
+ final VarCharVector types = (VarCharVector) reader.getVectorSchemaRoot().getVector(0);
+ for (int i = 0; i < types.getValueCount(); i++) {
+ assertThat(types.isNull(i)).isFalse();
+ tableTypes.add(types.getObject(i).toString());
+ }
+ }
+ assertThat(tableTypes).anyMatch("table"::equalsIgnoreCase);
+ }
+ }
+ }
+
+ void loadTable() throws Exception {
+ final Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/ true)),
+ Field.nullable("STRS", new ArrowType.Utf8())));
+ try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+ final IntVector ints = (IntVector) root.getVector(0);
+ final VarCharVector strs = (VarCharVector) root.getVector(1);
+
+ ints.allocateNew(4);
+ ints.setSafe(0, 0);
+ ints.setSafe(1, 1);
+ ints.setSafe(2, 2);
+ ints.setNull(3);
+ strs.allocateNew(4);
+ strs.setNull(0);
+ strs.setSafe(1, "foo".getBytes(StandardCharsets.UTF_8));
+ strs.setSafe(2, "".getBytes(StandardCharsets.UTF_8));
+ strs.setSafe(3, "asdf".getBytes(StandardCharsets.UTF_8));
+ root.setRowCount(4);
+
+ // TODO: XXX: need a "quirks" system to handle idiosyncracies. For example: Derby forces table
+ // names to uppercase, but does not do case folding in all places.
+ try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+ stmt.bind(root);
+ stmt.execute();
+ }
+ }
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 8dc6657..b74158a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -27,8 +27,17 @@
<description>Apache Arrow is open source, in-memory columnar data structures and low-overhead messaging</description>
<url>https://arrow.apache.org/</url>
+ <!-- Get nightly Arrow for now to pick up bug fixes -->
+ <repositories>
+ <repository>
+ <id>arrow-apache-nightlies</id>
+ <url>https://nightlies.apache.org/arrow/java</url>
+ </repository>
+ </repositories>
+
<properties>
- <dep.arrow.version>8.0.0</dep.arrow.version>
+ <!-- Nightly build 2022/07/11 -->
+ <dep.arrow.version>9.0.0.dev363</dep.arrow.version>
<adbc.version>9.0.0-SNAPSHOT</adbc.version>
</properties>