You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/07/12 15:53:50 UTC

[arrow-adbc] branch main updated: Implement metadata methods for JDBC (#36)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new b5691cb  Implement metadata methods for JDBC (#36)
b5691cb is described below

commit b5691cb3ca4091c93e11181f5a03d0e164a30322
Author: David Li <li...@gmail.com>
AuthorDate: Tue Jul 12 11:53:46 2022 -0400

    Implement metadata methods for JDBC (#36)
    
    Fixes #33.
---
 .../org/apache/arrow/adbc/core/AdbcConnection.java | 169 +++++++++-
 .../apache/arrow/adbc/core/StandardSchemas.java    | 133 ++++++++
 .../arrow/adbc/driver/jdbc/FixedJdbcStatement.java |  80 +++++
 .../arrow/adbc/driver/jdbc/JdbcArrowReader.java    |  27 +-
 .../arrow/adbc/driver/jdbc/JdbcConnection.java     |  84 +++++
 .../arrow/adbc/driver/jdbc/JdbcDriverUtil.java     |   8 +-
 .../adbc/driver/jdbc/JdbcMetadataBuilder.java      | 341 +++++++++++++++++++++
 .../arrow/adbc/driver/jdbc/JdbcStatement.java      |   3 +-
 .../arrow/adbc/driver/jdbc/RootArrowReader.java    |  68 ++++
 .../driver/jdbc/JdbcConnectionMetadataTest.java    |  36 +++
 .../testsuite/AbstractConnectionMetadataTest.java  | 229 ++++++++++++++
 java/pom.xml                                       |  11 +-
 12 files changed, 1170 insertions(+), 19 deletions(-)

diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
index b92a086..2a1e2b8 100644
--- a/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/AdbcConnection.java
@@ -17,6 +17,7 @@
 package org.apache.arrow.adbc.core;
 
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
 
 /** A connection to a {@link AdbcDatabase}. */
 public interface AdbcConnection extends AutoCloseable {
@@ -43,7 +44,173 @@ public interface AdbcConnection extends AutoCloseable {
    * does not match the table schema.
    */
   default AdbcStatement bulkIngest(String targetTableName) throws AdbcException {
-    throw new UnsupportedOperationException("Connection does not support bulk ingestion");
+    throw new UnsupportedOperationException("Connection does not support bulkIngest(String)");
+  }
+
+  /**
+   * Get a hierarchical view of all catalogs, database schemas, tables, and columns.
+   *
+   * <p>The result is an Arrow dataset with the following schema:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             </tr>
+   *   <tr><td>catalog_name</td>            <td>utf8</td>                   </tr>
+   *   <tr><td>catalog_db_schemas</td>      <td>list[DB_SCHEMA_SCHEMA]</td> </tr>
+   * </table>
+   *
+   * DB_SCHEMA_SCHEMA is a Struct with fields:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             </tr>
+   *   <tr><td>db_schema_name</td>          <td>utf8</td>                   </tr>
+   *   <tr><td>db_schema_tables</td>        <td>list[TABLE_SCHEMA]</td>     </tr>
+   * </table>
+   *
+   * TABLE_SCHEMA is a Struct with fields:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             </tr>
+   *   <tr><td>table_name</td>              <td>utf8 not null</td>          </tr>
+   *   <tr><td>table_type</td>              <td>utf8 not null</td>          </tr>
+   *   <tr><td>table_columns</td>           <td>list[COLUMN_SCHEMA]</td>    </tr>
+   *   <tr><td>table_constraints</td>       <td>list[CONSTRAINT_SCHEMA]</td></tr>
+   * </table>
+   *
+   * COLUMN_SCHEMA is a Struct with fields:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             <th>Comments</th></tr>
+   *   <tr><td>column_name</td>             <td>utf8 not null</td>          <td></td></tr>
+   *   <tr><td>ordinal_position</td>        <td>int32</td>                  <td>(1)</td></tr>
+   *   <tr><td>remarks</td>                 <td>utf8</td>                   <td>(2)</td></tr>
+   *   <tr><td>xdbc_data_type</td>          <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_type_name</td>          <td>utf8 </td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_column_size</td>        <td>int32</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_decimal_digits</td>     <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_num_prec_radix</td>     <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_nullable</td>           <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_column_def</td>         <td>utf8</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_sql_data_type</td>      <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_datetime_sub</td>       <td>int16</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_char_octet_length</td>  <td>int32</td>                  <td>(3)</td></tr>
+   *   <tr><td>xdbc_is_nullable</td>        <td>utf8</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_scope_catalog</td>      <td>utf8</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_scope_schema</td>       <td>utf8</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_scope_table</td>        <td>utf8</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_is_autoincrement</td>   <td>bool</td>                   <td>(3)</td></tr>
+   *   <tr><td>xdbc_is_generatedcolumn</td> <td>bool</td>                   <td>(3)</td></tr>
+   * </table>
+   *
+   * Notes:
+   *
+   * <ol>
+   *   <li>The column's ordinal position in the table (starting from 1).
+   *   <li>Database-specific description of the column.
+   *   <li>Optional value. Should be null if not supported by the driver. xdbc_ values are meant to
+   *       provide JDBC/ODBC-compatible metadata in an agnostic manner.
+   * </ol>
+   *
+   * CONSTRAINT_SCHEMA is a Struct with fields:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             <th>Comments</th></tr>
+   *   <tr><td>constraint_name</td>         <td>utf8 not null</td>          <td></td></tr>
+   *   <tr><td>constraint_type</td>         <td>utf8 not null</td>          <td>(1)</td></tr>
+   *   <tr><td>constraint_column_names</td> <td>list[utf8] not null</td>    <td>(2)</td></tr>
+   *   <tr><td>constraint_column_usage</td> <td>list[USAGE_SCHEMA]</td>     <td>(3)</td></tr>
+   * </table>
+   *
+   * <ol>
+   *   <li>One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
+   *   <li>The columns on the current table that are constrained, in order.
+   *   <li>For FOREIGN KEY only, the referenced table and columns.
+   * </ol>
+   *
+   * USAGE_SCHEMA is a Struct with fields:
+   *
+   * <table border="1">
+   *   <tr><th>Field Name</th>              <th>Field Type</th>             </tr>
+   *   <tr><td>fk_catalog</td>              <td>utf8</td>                   </tr>
+   *   <tr><td>fk_db_schema</td>            <td>utf8</td>                   </tr>
+   *   <tr><td>fk_table</td>                <td>utf8 not null</td>          </tr>
+   *   <tr><td>fk_column_name</td>          <td>utf8 not null</td>          </tr>
+   * </table>
+   *
+   * @param depth The level of nesting to display. If ALL, display all levels (up through columns).
+   *     If CATALOGS, display only catalogs (i.e. catalog_schemas will be null), and so on. May be a
+   *     search pattern (see class documentation).
+   * @param catalogPattern Only show tables in the given catalog. If null, do not filter by catalog.
+   *     If an empty string, only show tables without a catalog. May be a search pattern (see class
+   *     documentation).
+   * @param dbSchemaPattern Only show tables in the given database schema. If null, do not filter by
+   *     database schema. If an empty string, only show tables without a database schema. May be a
+   *     search pattern (see class documentation).
+   * @param tableNamePattern Only show tables with the given name. If an empty string, only show
+   *     tables without a catalog. May be a search pattern (see class documentation).
+   * @param tableTypes Only show tables matching one of the given table types. If null, show tables
+   *     of any type. Valid table types can be fetched from {@link #getTableTypes()}.
+   * @param columnNamePattern Only show columns with the given name. If null, do not filter by name.
+   *     May be a search pattern (see class documentation).
+   */
+  default AdbcStatement getObjects(
+      GetObjectsDepth depth,
+      String catalogPattern,
+      String dbSchemaPattern,
+      String tableNamePattern,
+      String[] tableTypes,
+      String columnNamePattern)
+      throws AdbcException {
+    throw new UnsupportedOperationException(
+        "Connection does not support getTableSchema(String, String, String)");
+  }
+
+  /**
+   * The level of nesting to retrieve for {@link #getObjects(GetObjectsDepth, String, String,
+   * String, String[], String)}.
+   */
+  enum GetObjectsDepth {
+    /** Display ALL objects (catalog, database schemas, tables, and columns). */
+    ALL,
+    /** Display only catalogs. */
+    CATALOGS,
+    /** Display catalogs and database schemas. */
+    DB_SCHEMAS,
+    /** Display catalogs, database schemas, and tables. */
+    TABLES,
+  }
+
+  /**
+   * Get the Arrow schema of a database table.
+   *
+   * @param catalog The catalog of the table (or null).
+   * @param dbSchema The database schema of the table (or null).
+   * @param tableName The table name.
+   * @return The table schema.
+   */
+  default Schema getTableSchema(String catalog, String dbSchema, String tableName)
+      throws AdbcException {
+    throw new UnsupportedOperationException(
+        "Connection does not support getTableSchema(String, String, String)");
+  }
+
+  /**
+   * Get a list of table types supported by the database.
+   *
+   * <p>The result is an Arrow dataset with the following schema:
+   *
+   * <table border="1">
+   *   <tr>
+   *     <th>Field Name</th>
+   *     <th>Field Type</th>
+   *   </tr>
+   *   <tr>
+   *     <td>table_type</td>
+   *     <td>utf8 not null</td>
+   *   </tr>
+   * </table>
+   */
+  default AdbcStatement getTableTypes() throws AdbcException {
+    throw new UnsupportedOperationException("Connection does not support getTableTypes()");
   }
 
   /**
diff --git a/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java b/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java
new file mode 100644
index 0000000..af35cbb
--- /dev/null
+++ b/java/core/src/main/java/org/apache/arrow/adbc/core/StandardSchemas.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.vector.complex.BaseRepeatedValueVector;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public final class StandardSchemas {
+  private StandardSchemas() {
+    throw new AssertionError("Do not instantiate this class");
+  }
+
+  private static final ArrowType INT16 = new ArrowType.Int(16, true);
+  private static final ArrowType INT32 = new ArrowType.Int(32, true);
+
+  /** The schema of the result set of {@link AdbcConnection#getTableTypes()}. */
+  public static final Schema TABLE_TYPES_SCHEMA =
+      new Schema(
+          Collections.singletonList(Field.notNullable("table_type", ArrowType.Utf8.INSTANCE)));
+
+  public static final List<Field> USAGE_SCHEMA =
+      Arrays.asList(
+          Field.nullable("fk_catalog", ArrowType.Utf8.INSTANCE),
+          Field.nullable("fk_db_schema", ArrowType.Utf8.INSTANCE),
+          Field.notNullable("fk_table", ArrowType.Utf8.INSTANCE),
+          Field.notNullable("fk_column_name", ArrowType.Utf8.INSTANCE));
+
+  public static final List<Field> CONSTRAINT_SCHEMA =
+      Arrays.asList(
+          Field.notNullable("constraint_name", ArrowType.Utf8.INSTANCE),
+          Field.notNullable("constraint_type", ArrowType.Utf8.INSTANCE),
+          new Field(
+              "constraint_column_names",
+              FieldType.notNullable(ArrowType.List.INSTANCE),
+              Collections.singletonList(
+                  Field.nullable(BaseRepeatedValueVector.DATA_VECTOR_NAME, new ArrowType.Utf8()))),
+          new Field(
+              "constraint_column_usage",
+              FieldType.notNullable(ArrowType.List.INSTANCE),
+              Collections.singletonList(
+                  new Field(
+                      BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                      FieldType.nullable(ArrowType.Struct.INSTANCE),
+                      USAGE_SCHEMA))));
+
+  public static final List<Field> COLUMN_SCHEMA =
+      Arrays.asList(
+          new Field("column_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("ordinal_position", FieldType.nullable(INT32), null),
+          new Field("remarks", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_data_type", FieldType.nullable(INT16), null),
+          new Field("xdbc_type_name", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_column_size", FieldType.nullable(INT32), null),
+          new Field("xdbc_decimal_digits", FieldType.nullable(INT16), null),
+          new Field("xdbc_num_prec_radix", FieldType.nullable(INT16), null),
+          new Field("xdbc_nullable", FieldType.nullable(INT16), null),
+          new Field("xdbc_column_def", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_sql_data_type", FieldType.nullable(INT16), null),
+          new Field("xdbc_datetime_sub", FieldType.nullable(INT16), null),
+          new Field("xdbc_char_octet_length", FieldType.nullable(INT32), null),
+          new Field("xdbc_is_nullable", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_scope_catalog", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_scope_schema", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_scope_table", FieldType.nullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("xdbc_is_autoincrement", FieldType.nullable(ArrowType.Bool.INSTANCE), null),
+          new Field("xdbc_is_generatedcolumn", FieldType.nullable(ArrowType.Bool.INSTANCE), null));
+
+  public static final List<Field> TABLE_SCHEMA =
+      Arrays.asList(
+          new Field("table_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+          new Field("table_type", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+          new Field(
+              "table_columns",
+              FieldType.notNullable(ArrowType.List.INSTANCE),
+              Collections.singletonList(
+                  new Field(
+                      BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                      FieldType.nullable(ArrowType.Struct.INSTANCE),
+                      COLUMN_SCHEMA))),
+          new Field(
+              "table_constraints",
+              FieldType.notNullable(ArrowType.List.INSTANCE),
+              Collections.singletonList(
+                  new Field(
+                      BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                      FieldType.nullable(ArrowType.Struct.INSTANCE),
+                      CONSTRAINT_SCHEMA))));
+
+  public static final List<Field> DB_SCHEMA_SCHEMA =
+      Arrays.asList(
+          new Field("db_schema_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+          new Field(
+              "db_schema_tables",
+              FieldType.notNullable(ArrowType.List.INSTANCE),
+              Collections.singletonList(
+                  new Field(
+                      BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                      FieldType.nullable(ArrowType.Struct.INSTANCE),
+                      TABLE_SCHEMA))));
+
+  public static final Schema GET_OBJECTS_SCHEMA =
+      new Schema(
+          Arrays.asList(
+              new Field("catalog_name", FieldType.notNullable(ArrowType.Utf8.INSTANCE), null),
+              new Field(
+                  "catalog_db_schemas",
+                  FieldType.notNullable(ArrowType.List.INSTANCE),
+                  Collections.singletonList(
+                      new Field(
+                          BaseRepeatedValueVector.DATA_VECTOR_NAME,
+                          FieldType.nullable(ArrowType.Struct.INSTANCE),
+                          DB_SCHEMA_SCHEMA)))));
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java
new file mode 100644
index 0000000..feae265
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/FixedJdbcStatement.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.sql.ResultSet;
+import java.util.Collections;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An AdbcStatement implementation that returns a fixed ResultSet. */
+class FixedJdbcStatement implements AdbcStatement {
+  private final BufferAllocator allocator;
+  private final Schema overrideSchema;
+  private ResultSet resultSet;
+  private ArrowRecordBatch recordBatch;
+
+  public FixedJdbcStatement(BufferAllocator allocator, ResultSet resultSet, Schema overrideSchema) {
+    this.allocator = allocator;
+    this.resultSet = resultSet;
+    this.overrideSchema = overrideSchema;
+  }
+
+  public FixedJdbcStatement(BufferAllocator allocator, VectorSchemaRoot root) {
+    this.allocator = allocator;
+    this.overrideSchema = root.getSchema();
+    // Unload the root to preserve the data
+    recordBatch = new VectorUnloader(root).getRecordBatch();
+  }
+
+  @Override
+  public void execute() throws AdbcException {
+    throw new UnsupportedOperationException("Cannot execute() this statement.");
+  }
+
+  @Override
+  public ArrowReader getArrowReader() throws AdbcException {
+    final ArrowReader reader;
+    if (resultSet != null) {
+      reader = new JdbcArrowReader(allocator, resultSet, overrideSchema);
+      resultSet = null;
+    } else {
+      reader =
+          new RootArrowReader(allocator, overrideSchema, Collections.singletonList(recordBatch));
+      recordBatch = null;
+    }
+    return reader;
+  }
+
+  @Override
+  public void prepare() throws AdbcException {
+    throw new UnsupportedOperationException("Cannot prepare() this statement.");
+  }
+
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(resultSet, recordBatch);
+  }
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
index f4b8e73..4445729 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcArrowReader.java
@@ -33,30 +33,37 @@ import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 
+/** An ArrowReader that wraps a JDBC ResultSet. */
 public class JdbcArrowReader extends ArrowReader {
   private final ArrowVectorIterator delegate;
   private final Schema schema;
   private long bytesRead;
 
-  JdbcArrowReader(BufferAllocator allocator, ResultSet resultSet) throws AdbcException {
+  JdbcArrowReader(BufferAllocator allocator, ResultSet resultSet, Schema overrideSchema)
+      throws AdbcException {
     super(allocator);
-    try {
-      this.delegate = JdbcToArrow.sqlToArrowVectorIterator(resultSet, allocator);
-    } catch (SQLException e) {
-      throw JdbcDriverUtil.fromSqlException(e);
-    } catch (IOException e) {
-      throw new AdbcException(
-          JdbcDriverUtil.prefixExceptionMessage(e.getMessage()), e, AdbcStatusCode.IO, null, -1);
-    }
     final JdbcToArrowConfig config =
         new JdbcToArrowConfigBuilder()
             .setAllocator(allocator)
             .setCalendar(JdbcToArrowUtils.getUtcCalendar())
+            .setTargetBatchSize(1024)
             .build();
     try {
-      this.schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config);
+      this.delegate = JdbcToArrow.sqlToArrowVectorIterator(resultSet, config);
     } catch (SQLException e) {
       throw JdbcDriverUtil.fromSqlException(e);
+    } catch (IOException e) {
+      throw new AdbcException(
+          JdbcDriverUtil.prefixExceptionMessage(e.getMessage()), e, AdbcStatusCode.IO, null, -1);
+    }
+    if (overrideSchema != null) {
+      this.schema = overrideSchema;
+    } else {
+      try {
+        this.schema = JdbcToArrowUtils.jdbcToArrowSchema(resultSet.getMetaData(), config);
+      } catch (SQLException e) {
+        throw JdbcDriverUtil.fromSqlException("Failed to convert JDBC schema to Arrow schema:", e);
+      }
     }
     this.bytesRead = 0;
 
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
index 71554f9..9559c30 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnection.java
@@ -17,11 +17,23 @@
 package org.apache.arrow.adbc.driver.jdbc;
 
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
+import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
 import org.apache.arrow.adbc.core.AdbcConnection;
 import org.apache.arrow.adbc.core.AdbcException;
 import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.StandardSchemas;
 import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
 
 public class JdbcConnection implements AdbcConnection {
   private final BufferAllocator allocator;
@@ -52,6 +64,78 @@ public class JdbcConnection implements AdbcConnection {
     return JdbcStatement.ingestRoot(allocator, connection, targetTableName);
   }
 
+  @Override
+  public AdbcStatement getObjects(
+      final GetObjectsDepth depth,
+      final String catalogPattern,
+      final String dbSchemaPattern,
+      final String tableNamePattern,
+      final String[] tableTypes,
+      final String columnNamePattern)
+      throws AdbcException {
+    // Build up the metadata in-memory and then return a constant reader.
+    try {
+      final VectorSchemaRoot root =
+          new JdbcMetadataBuilder(
+                  allocator,
+                  connection,
+                  depth,
+                  catalogPattern,
+                  dbSchemaPattern,
+                  tableNamePattern,
+                  tableTypes,
+                  columnNamePattern)
+              .build();
+      return new FixedJdbcStatement(allocator, root);
+    } catch (SQLException e) {
+      throw JdbcDriverUtil.fromSqlException(e);
+    }
+  }
+
+  @Override
+  public Schema getTableSchema(String catalog, String dbSchema, String tableName)
+      throws AdbcException {
+    // Reconstruct the schema from the metadata
+    // XXX: this may be inconsistent with reading the table
+    final List<Field> fields = new ArrayList<>();
+    try (final ResultSet rs =
+        connection
+            .getMetaData()
+            .getColumns(catalog, dbSchema, tableName, /*columnNamePattern*/ null)) {
+      while (rs.next()) {
+        final String fieldName = rs.getString("COLUMN_NAME");
+        final boolean nullable = rs.getInt("NULLABLE") != DatabaseMetaData.columnNoNulls;
+        final int jdbcType = rs.getInt("DATA_TYPE");
+        final int precision = rs.getInt("COLUMN_SIZE");
+        final int scale = rs.getInt("DECIMAL_DIGITS");
+        final ArrowType arrowType =
+            JdbcToArrowUtils.getArrowTypeFromJdbcType(
+                new JdbcFieldInfo(jdbcType, precision, scale), /*calendar*/ null);
+        final Field field =
+            new Field(
+                fieldName,
+                new FieldType(
+                    nullable, arrowType, /*dictionary*/ null, /*metadata*/ null), /*children*/
+                null);
+        fields.add(field);
+      }
+    } catch (SQLException e) {
+      throw JdbcDriverUtil.fromSqlException(e);
+    }
+    return new Schema(fields);
+  }
+
+  @Override
+  public AdbcStatement getTableTypes() throws AdbcException {
+    // XXX: this is nonconforming since we don't currently have a way to rename the columns
+    try {
+      return new FixedJdbcStatement(
+          allocator, connection.getMetaData().getTableTypes(), StandardSchemas.TABLE_TYPES_SCHEMA);
+    } catch (SQLException e) {
+      throw JdbcDriverUtil.fromSqlException(e);
+    }
+  }
+
   @Override
   public void rollback() throws AdbcException {
     try {
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
index 1a9a6a9..4926877 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcDriverUtil.java
@@ -44,11 +44,7 @@ final class JdbcDriverUtil {
 
   static AdbcException fromSqlException(
       AdbcStatusCode status, String format, SQLException e, Object... values) {
-    return new AdbcException(
-        String.format(format, values) + prefixExceptionMessage(e.getMessage()),
-        e.getCause(),
-        status,
-        e.getSQLState(),
-        e.getErrorCode());
+    final String message = "[JDBC] " + String.format(format, values) + e.getMessage();
+    return new AdbcException(message, e.getCause(), status, e.getSQLState(), e.getErrorCode());
   }
 }
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java
new file mode 100644
index 0000000..1d2984b
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcMetadataBuilder.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+
+/** Helper class to track state needed to build up the metadata structure. */
+final class JdbcMetadataBuilder implements AutoCloseable {
+  private final AdbcConnection.GetObjectsDepth depth;
+  private final String catalogPattern;
+  private final String dbSchemaPattern;
+  private final String tableNamePattern;
+  private final String[] tableTypesFilter;
+  private final String columnNamePattern;
+  private final DatabaseMetaData dbmd;
+  private VectorSchemaRoot root;
+
+  final VarCharVector catalogNames;
+  final ListVector catalogDbSchemas;
+  final StructVector dbSchemas;
+  final VarCharVector dbSchemaNames;
+  final ListVector dbSchemaTables;
+  final StructVector tables;
+  final VarCharVector tableNames;
+  final VarCharVector tableTypes;
+  final ListVector tableColumns;
+  final StructVector columns;
+  final VarCharVector columnNames;
+  final IntVector columnOrdinalPositions;
+  final VarCharVector columnRemarks;
+  final SmallIntVector columnXdbcDataTypes;
+  final ListVector tableConstraints;
+  final StructVector constraints;
+  final VarCharVector constraintNames;
+  final VarCharVector constraintTypes;
+  final ListVector constraintColumnNames;
+  final VarCharVector constraintColumnNameItems;
+  final ListVector constraintColumnUsage;
+  final StructVector columnUsages;
+  final VarCharVector columnUsageFkCatalogs;
+  final VarCharVector columnUsageFkDbSchemas;
+  final VarCharVector columnUsageFkTables;
+  final VarCharVector columnUsageFkColumns;
+
+  JdbcMetadataBuilder(
+      BufferAllocator allocator,
+      Connection connection,
+      final AdbcConnection.GetObjectsDepth depth,
+      final String catalogPattern,
+      final String dbSchemaPattern,
+      final String tableNamePattern,
+      final String[] tableTypesFilter,
+      final String columnNamePattern)
+      throws SQLException {
+    this.depth = depth;
+    this.catalogPattern = catalogPattern;
+    this.dbSchemaPattern = dbSchemaPattern;
+    this.tableNamePattern = tableNamePattern;
+    this.tableTypesFilter = tableTypesFilter;
+    this.columnNamePattern = columnNamePattern;
+    this.root = VectorSchemaRoot.create(StandardSchemas.GET_OBJECTS_SCHEMA, allocator);
+    this.dbmd = connection.getMetaData();
+    this.catalogNames = (VarCharVector) root.getVector(0);
+    this.catalogDbSchemas = (ListVector) root.getVector(1);
+    this.dbSchemas = (StructVector) catalogDbSchemas.getDataVector();
+    this.dbSchemaNames = (VarCharVector) dbSchemas.getVectorById(0);
+    this.dbSchemaTables = (ListVector) dbSchemas.getVectorById(1);
+    this.tables = (StructVector) dbSchemaTables.getDataVector();
+    this.tableNames = (VarCharVector) tables.getVectorById(0);
+    this.tableTypes = (VarCharVector) tables.getVectorById(1);
+    this.tableColumns = (ListVector) tables.getVectorById(2);
+    this.columns = (StructVector) tableColumns.getDataVector();
+    this.columnNames = (VarCharVector) columns.getVectorById(0);
+    this.columnOrdinalPositions = (IntVector) columns.getVectorById(1);
+    this.columnRemarks = (VarCharVector) columns.getVectorById(2);
+    this.columnXdbcDataTypes = (SmallIntVector) columns.getVectorById(3);
+    this.tableConstraints = (ListVector) tables.getVectorById(3);
+    this.constraints = (StructVector) tableConstraints.getDataVector();
+    this.constraintNames = (VarCharVector) constraints.getVectorById(0);
+    this.constraintTypes = (VarCharVector) constraints.getVectorById(1);
+    this.constraintColumnNames = (ListVector) constraints.getVectorById(2);
+    this.constraintColumnNameItems = (VarCharVector) constraintColumnNames.getDataVector();
+    this.constraintColumnUsage = (ListVector) constraints.getVectorById(3);
+    this.columnUsages = (StructVector) constraintColumnUsage.getDataVector();
+    this.columnUsageFkCatalogs = (VarCharVector) columnUsages.getVectorById(0);
+    this.columnUsageFkDbSchemas = (VarCharVector) columnUsages.getVectorById(1);
+    this.columnUsageFkTables = (VarCharVector) columnUsages.getVectorById(2);
+    this.columnUsageFkColumns = (VarCharVector) columnUsages.getVectorById(3);
+  }
+
+  VectorSchemaRoot build() throws SQLException {
+    // TODO: need to turn catalogPattern into a catalog filter since JDBC doesn't support this
+    try (final ResultSet rs = dbmd.getCatalogs()) {
+      int catalogCount = 0;
+      while (rs.next()) {
+        final String catalogName = rs.getString(1);
+        addCatalogRow(catalogCount, catalogName);
+        catalogCount++;
+      }
+      // TODO: only include this if matches filter
+      addCatalogRow(catalogCount, /*catalogName*/ "");
+      catalogCount++;
+      root.setRowCount(catalogCount);
+    }
+    VectorSchemaRoot result = root;
+    root = null;
+    return result;
+  }
+
+  private void addCatalogRow(int rowIndex, String catalogName) throws SQLException {
+    catalogNames.setSafe(rowIndex, catalogName.getBytes(StandardCharsets.UTF_8));
+    if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) {
+      catalogDbSchemas.setNull(rowIndex);
+    } else {
+      int dbSchemasBaseIndex = catalogDbSchemas.startNewValue(rowIndex);
+      final int dbSchemaCount = buildDbSchemas(dbSchemasBaseIndex, catalogName);
+      catalogDbSchemas.endValue(rowIndex, dbSchemaCount);
+    }
+  }
+
+  private int buildDbSchemas(int rowIndex, String catalogName) throws SQLException {
+    int dbSchemaCount = 0;
+    // TODO: get tables with no schema
+    try (final ResultSet rs = dbmd.getSchemas(catalogName, dbSchemaPattern)) {
+      while (rs.next()) {
+        final String dbSchemaName = rs.getString(1);
+        addDbSchemaRow(rowIndex + dbSchemaCount, catalogName, dbSchemaName);
+        dbSchemaCount++;
+      }
+    }
+    return dbSchemaCount;
+  }
+
+  private void addDbSchemaRow(int rowIndex, String catalogName, String dbSchemaName)
+      throws SQLException {
+    dbSchemas.setIndexDefined(rowIndex);
+    dbSchemaNames.setSafe(rowIndex, dbSchemaName.getBytes(StandardCharsets.UTF_8));
+    if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) {
+      dbSchemaTables.setNull(rowIndex);
+    } else {
+      int tableBaseIndex = dbSchemaTables.startNewValue(rowIndex);
+      final int tableCount = buildTables(tableBaseIndex, catalogName, dbSchemaName);
+      dbSchemaTables.endValue(rowIndex, tableCount);
+    }
+  }
+
+  private int buildTables(int rowIndex, String catalogName, String dbSchemaName)
+      throws SQLException {
+    int tableCount = 0;
+    try (final ResultSet rs =
+        dbmd.getTables(catalogName, dbSchemaName, tableNamePattern, tableTypesFilter)) {
+      while (rs.next()) {
+        final String tableName = rs.getString(3);
+        final String tableType = rs.getString(4);
+        tables.setIndexDefined(rowIndex + tableCount);
+        tableNames.setSafe(rowIndex + tableCount, tableName.getBytes(StandardCharsets.UTF_8));
+        tableTypes.setSafe(rowIndex + tableCount, tableType.getBytes(StandardCharsets.UTF_8));
+        final int constraintOffset = tableConstraints.startNewValue(rowIndex + tableCount);
+        int constraintCount = 0;
+        // JDBC doesn't directly expose constraints. Merge various info methods:
+        // 1. Primary keys
+        try (final ResultSet pk = dbmd.getPrimaryKeys(catalogName, dbSchemaName, tableName)) {
+          String constraintName = null;
+          List<String> constraintColumns = new ArrayList<>();
+          if (pk.next()) {
+            while (pk.next()) {
+              constraintName = pk.getString(6);
+              String columnName = pk.getString(4);
+              int columnIndex = pk.getInt(5);
+              while (constraintColumns.size() < columnIndex) constraintColumns.add(null);
+              constraintColumns.set(columnIndex - 1, columnName);
+            }
+            addConstraint(
+                constraintOffset + constraintCount,
+                constraintName,
+                "PRIMARY KEY",
+                constraintColumns,
+                Collections.emptyList());
+            constraintCount++;
+          }
+        }
+        // 2. Foreign keys ("imported" keys)
+        try (final ResultSet fk = dbmd.getImportedKeys(catalogName, dbSchemaName, tableName)) {
+          List<String> names = new ArrayList<>();
+          List<List<String>> columns = new ArrayList<>();
+          List<List<ReferencedColumn>> references = new ArrayList<>();
+          while (fk.next()) {
+            String keyName = fk.getString(12);
+            String keyColumn = fk.getString(8);
+            int keySeq = fk.getInt(9);
+            if (keySeq == 1) {
+              names.add(keyName);
+              columns.add(new ArrayList<>());
+              references.add(new ArrayList<>());
+            }
+            columns.get(columns.size() - 1).add(keyColumn);
+            final ReferencedColumn reference = new ReferencedColumn();
+            reference.catalog = fk.getString(1);
+            reference.dbSchema = fk.getString(2);
+            reference.table = fk.getString(3);
+            reference.column = fk.getString(4);
+            references.get(references.size() - 1).add(reference);
+          }
+
+          for (int i = 0; i < names.size(); i++) {
+            addConstraint(
+                constraintOffset + constraintCount,
+                names.get(i),
+                "FOREIGN KEY",
+                columns.get(i),
+                references.get(i));
+            constraintCount++;
+          }
+        }
+
+        // TODO: UNIQUE constraints are exposed under indices
+        // TODO: how to get CHECK constraints?
+
+        tableConstraints.endValue(rowIndex + tableCount, constraintCount);
+        if (depth == AdbcConnection.GetObjectsDepth.TABLES) {
+          tableColumns.setNull(rowIndex + tableCount);
+        } else {
+          int columnBaseIndex = tableColumns.startNewValue(rowIndex);
+          final int columnCount =
+              buildColumns(columnBaseIndex, catalogName, dbSchemaName, tableName);
+          tableColumns.endValue(rowIndex, columnCount);
+        }
+        tableCount++;
+      }
+    }
+    return tableCount;
+  }
+
+  private int buildColumns(int rowIndex, String catalogName, String dbSchemaName, String tableName)
+      throws SQLException {
+    int columnCount = 0;
+    try (final ResultSet rs =
+        dbmd.getColumns(catalogName, dbSchemaName, tableName, columnNamePattern)) {
+      while (rs.next()) {
+        final String columnName = rs.getString(4);
+        final int ordinalPosition = rs.getInt(17);
+        final String remarks = rs.getString(12);
+        final int xdbcDataType = rs.getInt(5);
+        // TODO: other JDBC metadata
+
+        columns.setIndexDefined(rowIndex + columnCount);
+        columnNames.setSafe(rowIndex + columnCount, columnName.getBytes(StandardCharsets.UTF_8));
+        columnOrdinalPositions.setSafe(rowIndex + columnCount, ordinalPosition);
+        if (remarks != null) {
+          columnRemarks.setSafe(rowIndex + columnCount, remarks.getBytes(StandardCharsets.UTF_8));
+        }
+        columnXdbcDataTypes.setSafe(rowIndex + columnCount, xdbcDataType);
+
+        columnCount++;
+      }
+    }
+    return columnCount;
+  }
+
+  private void addConstraint(
+      int index,
+      String constraintName,
+      String constraintType,
+      List<String> constraintColumns,
+      List<ReferencedColumn> referencedColumns) {
+    if (constraintName == null) {
+      constraintNames.setNull(index);
+    } else {
+      constraintNames.setSafe(index, constraintName.getBytes(StandardCharsets.UTF_8));
+    }
+    constraintTypes.setSafe(index, constraintType.getBytes(StandardCharsets.UTF_8));
+
+    int namesOffset = constraintColumnNames.startNewValue(index);
+    for (final String column : constraintColumns) {
+      constraintColumnNameItems.setSafe(namesOffset++, column.getBytes(StandardCharsets.UTF_8));
+    }
+    constraintColumnNames.endValue(index, constraintColumns.size());
+    int usageOffset = constraintColumnUsage.startNewValue(index);
+    for (final ReferencedColumn column : referencedColumns) {
+      columnUsages.setIndexDefined(usageOffset);
+      if (column.catalog == null) {
+        columnUsageFkCatalogs.setNull(usageOffset);
+      } else {
+        columnUsageFkCatalogs.setSafe(usageOffset, column.catalog.getBytes(StandardCharsets.UTF_8));
+      }
+      if (column.dbSchema == null) {
+        columnUsageFkDbSchemas.setNull(usageOffset);
+      } else {
+        columnUsageFkDbSchemas.setSafe(
+            usageOffset, column.dbSchema.getBytes(StandardCharsets.UTF_8));
+      }
+      columnUsageFkTables.setSafe(usageOffset, column.table.getBytes(StandardCharsets.UTF_8));
+      columnUsageFkColumns.setSafe(usageOffset, column.column.getBytes(StandardCharsets.UTF_8));
+      usageOffset++;
+    }
+    constraintColumnUsage.endValue(index, referencedColumns.size());
+  }
+
+  @Override
+  public void close() throws Exception {
+    AutoCloseables.close(root);
+  }
+
+  static class ReferencedColumn {
+    String catalog;
+    String dbSchema;
+    String table;
+    String column;
+  }
+}
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
index b95f9b8..bf5c0db 100644
--- a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/JdbcStatement.java
@@ -212,7 +212,8 @@ public class JdbcStatement implements AdbcStatement {
     if (resultSet == null) {
       throw new IllegalStateException("Must call execute() before getArrowIterator()");
     }
-    final JdbcArrowReader reader = new JdbcArrowReader(allocator, resultSet);
+    final JdbcArrowReader reader =
+        new JdbcArrowReader(allocator, resultSet, /*overrideSchema*/ null);
     resultSet = null;
     return reader;
   }
diff --git a/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java
new file mode 100644
index 0000000..aadba47
--- /dev/null
+++ b/java/driver/jdbc/src/main/java/org/apache/arrow/adbc/driver/jdbc/RootArrowReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+/** An ArrowReader that wraps a list of ArrowRecordBatches. */
+class RootArrowReader extends ArrowReader {
+  private final Schema schema;
+  private final List<ArrowRecordBatch> batches;
+  int nextIndex;
+
+  public RootArrowReader(BufferAllocator allocator, Schema schema, List<ArrowRecordBatch> batches) {
+    super(allocator);
+    this.schema = schema;
+    this.batches = batches;
+    this.nextIndex = 0;
+  }
+
+  @Override
+  public boolean loadNextBatch() throws IOException {
+    if (nextIndex < batches.size()) {
+      new VectorLoader(getVectorSchemaRoot()).load(batches.get(nextIndex++));
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public long bytesRead() {
+    return 0;
+  }
+
+  @Override
+  protected void closeReadSource() throws IOException {
+    try {
+      AutoCloseables.close(batches);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected Schema readSchema() {
+    return schema;
+  }
+}
diff --git a/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
new file mode 100644
index 0000000..024223d
--- /dev/null
+++ b/java/driver/jdbc/src/test/java/org/apache/arrow/adbc/driver/jdbc/JdbcConnectionMetadataTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.adbc.driver.jdbc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.driver.testsuite.AbstractConnectionMetadataTest;
+import org.junit.jupiter.api.io.TempDir;
+
+public class JdbcConnectionMetadataTest extends AbstractConnectionMetadataTest {
+  @TempDir Path tempDir;
+
+  @Override
+  protected AdbcDatabase init() throws AdbcException {
+    final Map<String, String> parameters = new HashMap<>();
+    parameters.put("path", tempDir.toString() + "/db;create=true");
+    return JdbcDriver.INSTANCE.connect(parameters);
+  }
+}
diff --git a/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
new file mode 100644
index 0000000..bba9aac
--- /dev/null
+++ b/java/driver/testsuite/src/main/java/org/apache/arrow/adbc/driver/testsuite/AbstractConnectionMetadataTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adbc.driver.testsuite;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.core.StandardSchemas;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public abstract class AbstractConnectionMetadataTest {
+  protected AdbcDatabase database;
+  protected AdbcConnection connection;
+  protected BufferAllocator allocator;
+
+  protected abstract AdbcDatabase init() throws AdbcException;
+
+  @BeforeEach
+  public void beforeEach() throws Exception {
+    database = init();
+    connection = database.connect();
+    allocator = new RootAllocator();
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    AutoCloseables.close(connection, database, allocator);
+  }
+
+  @Test
+  void getObjectsColumns() throws Exception {
+    loadTable();
+    boolean tableFound = false;
+    try (final AdbcStatement stmt =
+        connection.getObjects(AdbcConnection.GetObjectsDepth.ALL, null, null, null, null, null)) {
+      try (final ArrowReader reader = stmt.getArrowReader()) {
+        assertThat(reader.getVectorSchemaRoot().getSchema())
+            .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+        assertThat(reader.loadNextBatch()).isTrue();
+
+        final ListVector dbSchemas = (ListVector) reader.getVectorSchemaRoot().getVector(1);
+        final ListVector dbSchemaTables =
+            (ListVector) ((StructVector) dbSchemas.getDataVector()).getVectorById(1);
+        final StructVector tables = (StructVector) dbSchemaTables.getDataVector();
+        final VarCharVector tableNames = (VarCharVector) tables.getVectorById(0);
+        final ListVector tableColumns = (ListVector) tables.getVectorById(2);
+
+        for (int i = 0; i < tables.getValueCount(); i++) {
+          if (tables.isNull(i)) {
+            continue;
+          }
+          final Text tableName = tableNames.getObject(i);
+          if (tableName != null && tableName.toString().equals("FOO")) {
+            tableFound = true;
+            @SuppressWarnings("unchecked")
+            final List<Map<String, ?>> columns = (List<Map<String, ?>>) tableColumns.getObject(i);
+            assertThat(columns)
+                .extracting("column_name")
+                .containsExactlyInAnyOrder(new Text("INTS"), new Text("STRS"));
+            assertThat(columns).extracting("ordinal_position").containsExactlyInAnyOrder(1, 2);
+          }
+        }
+      }
+    }
+    assertThat(tableFound).describedAs("Table FOO exists in metadata").isTrue();
+  }
+
+  @Test
+  void getObjectsCatalogs() throws Exception {
+    loadTable();
+    try (final AdbcStatement stmt =
+        connection.getObjects(
+            AdbcConnection.GetObjectsDepth.CATALOGS, null, null, null, null, null)) {
+      try (final ArrowReader reader = stmt.getArrowReader()) {
+        assertThat(reader.getVectorSchemaRoot().getSchema())
+            .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+        assertThat(reader.loadNextBatch()).isTrue();
+        assertThat(reader.getVectorSchemaRoot().getRowCount()).isGreaterThan(0);
+        final FieldVector dbSchemas = reader.getVectorSchemaRoot().getVector(1);
+        // We requested depth == CATALOGS, so the db_schemas field should be all null
+        assertThat(dbSchemas.getNullCount()).isEqualTo(dbSchemas.getValueCount());
+      }
+    }
+  }
+
+  @Test
+  void getObjectsDbSchemas() throws Exception {
+    loadTable();
+    try (final AdbcStatement stmt =
+        connection.getObjects(
+            AdbcConnection.GetObjectsDepth.DB_SCHEMAS, null, null, null, null, null)) {
+      try (final ArrowReader reader = stmt.getArrowReader()) {
+        assertThat(reader.getVectorSchemaRoot().getSchema())
+            .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+        assertThat(reader.loadNextBatch()).isTrue();
+        assertThat(reader.getVectorSchemaRoot().getRowCount()).isGreaterThan(0);
+      }
+    }
+  }
+
+  @Test
+  void getObjectsTables() throws Exception {
+    loadTable();
+    try (final AdbcStatement stmt =
+        connection.getObjects(
+            AdbcConnection.GetObjectsDepth.TABLES, null, null, null, null, null)) {
+      try (final ArrowReader reader = stmt.getArrowReader()) {
+        assertThat(reader.getVectorSchemaRoot().getSchema())
+            .isEqualTo(StandardSchemas.GET_OBJECTS_SCHEMA);
+        assertThat(reader.loadNextBatch()).isTrue();
+
+        final ListVector dbSchemas = (ListVector) reader.getVectorSchemaRoot().getVector(1);
+        final ListVector dbSchemaTables =
+            (ListVector) ((StructVector) dbSchemas.getDataVector()).getVectorById(1);
+        final StructVector tables = (StructVector) dbSchemaTables.getDataVector();
+        final VarCharVector tableNames = (VarCharVector) tables.getVectorById(0);
+        assertThat(IntStream.range(0, tableNames.getValueCount()).mapToObj(tableNames::getObject))
+            .contains(new Text("FOO"));
+      }
+    }
+  }
+
+  @Test
+  void getTableSchema() throws Exception {
+    final Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/ true)),
+                Field.nullable("STRS", new ArrowType.Utf8())));
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+        stmt.bind(root);
+        stmt.execute();
+      }
+    }
+    assertThat(connection.getTableSchema(/*catalog*/ null, /*dbSchema*/ null, "FOO"))
+        .isEqualTo(schema);
+  }
+
+  @Test
+  void getTableTypes() throws Exception {
+    try (final AdbcStatement stmt = connection.getTableTypes()) {
+      try (final ArrowReader reader = stmt.getArrowReader()) {
+        assertThat(reader.getVectorSchemaRoot().getSchema())
+            .isEqualTo(StandardSchemas.TABLE_TYPES_SCHEMA);
+        List<String> tableTypes = new ArrayList<>();
+        while (reader.loadNextBatch()) {
+          final VarCharVector types = (VarCharVector) reader.getVectorSchemaRoot().getVector(0);
+          for (int i = 0; i < types.getValueCount(); i++) {
+            assertThat(types.isNull(i)).isFalse();
+            tableTypes.add(types.getObject(i).toString());
+          }
+        }
+        assertThat(tableTypes).anyMatch("table"::equalsIgnoreCase);
+      }
+    }
+  }
+
+  void loadTable() throws Exception {
+    final Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("INTS", new ArrowType.Int(32, /*signed=*/ true)),
+                Field.nullable("STRS", new ArrowType.Utf8())));
+    try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final IntVector ints = (IntVector) root.getVector(0);
+      final VarCharVector strs = (VarCharVector) root.getVector(1);
+
+      ints.allocateNew(4);
+      ints.setSafe(0, 0);
+      ints.setSafe(1, 1);
+      ints.setSafe(2, 2);
+      ints.setNull(3);
+      strs.allocateNew(4);
+      strs.setNull(0);
+      strs.setSafe(1, "foo".getBytes(StandardCharsets.UTF_8));
+      strs.setSafe(2, "".getBytes(StandardCharsets.UTF_8));
+      strs.setSafe(3, "asdf".getBytes(StandardCharsets.UTF_8));
+      root.setRowCount(4);
+
+      // TODO: XXX: need a "quirks" system to handle idiosyncracies. For example: Derby forces table
+      // names to uppercase, but does not do case folding in all places.
+      try (final AdbcStatement stmt = connection.bulkIngest("FOO")) {
+        stmt.bind(root);
+        stmt.execute();
+      }
+    }
+  }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 8dc6657..b74158a 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -27,8 +27,17 @@
   <description>Apache Arrow is open source, in-memory columnar data structures and low-overhead messaging</description>
   <url>https://arrow.apache.org/</url>
 
+  <!-- Get nightly Arrow for now to pick up bug fixes -->
+  <repositories>
+    <repository>
+      <id>arrow-apache-nightlies</id>
+      <url>https://nightlies.apache.org/arrow/java</url>
+    </repository>
+  </repositories>
+
   <properties>
-    <dep.arrow.version>8.0.0</dep.arrow.version>
+    <!-- Nightly build 2022/07/11 -->
+    <dep.arrow.version>9.0.0.dev363</dep.arrow.version>
     <adbc.version>9.0.0-SNAPSHOT</adbc.version>
   </properties>