You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/23 13:51:03 UTC

[arrow-adbc] branch main updated: Add table/column reflection (#18)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new a549499  Add table/column reflection (#18)
a549499 is described below

commit a54949933cefd5ed6be61f97ba25bbef59898dbe
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jun 23 09:50:57 2022 -0400

    Add table/column reflection (#18)
    
    Fixes #14.
    
    * Implement proposed metadata API
    
    * Implement proposed API for constraint metadata
    
    * Remove redundant metadata methods
---
 adbc.h                                     | 213 +++++---
 adbc_driver_manager/adbc_driver_manager.cc |  20 +
 drivers/flight_sql/flight_sql.cc           |  17 -
 drivers/sqlite/sqlite.cc                   | 802 ++++++++++++++++++++++++-----
 drivers/sqlite/sqlite_test.cc              | 395 +++++++++++++-
 5 files changed, 1215 insertions(+), 232 deletions(-)

diff --git a/adbc.h b/adbc.h
index a1a3bda..4be0d78 100644
--- a/adbc.h
+++ b/adbc.h
@@ -117,6 +117,8 @@ struct ArrowArrayStream {
 #define ADBC
 
 // Storage class macros for Windows
+// Allow overriding/aliasing with application-defined macros
+#if !defined(ADBC_EXPORT)
 #if defined(_WIN32)
 #if defined(ADBC_EXPORTING)
 #define ADBC_EXPORT __declspec(dllexport)
@@ -126,6 +128,7 @@ struct ArrowArrayStream {
 #else
 #define ADBC_EXPORT
 #endif  // defined(_WIN32)
+#endif  // !defined(ADBC_EXPORT)
 
 /// \file ADBC: Arrow DataBase connectivity (client API)
 ///
@@ -201,7 +204,7 @@ struct ADBC_EXPORT AdbcDatabase {
   void* private_data;
   /// \brief The associated driver (used by the driver manager to help
   ///   track state).
-  AdbcDriver* private_driver;
+  struct AdbcDriver* private_driver;
 };
 
 /// \brief Allocate a new (but uninitialized) database.
@@ -233,17 +236,6 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
 /// \defgroup adbc-connection Connection establishment.
 /// @{
 
-/// \brief A set of connection options.
-struct ADBC_EXPORT AdbcConnectionOptions {
-  /// \brief The database to connect to.
-  struct AdbcDatabase* database;
-
-  /// \brief A driver-specific connection string.
-  ///
-  /// Should be in ODBC-style format ("Key1=Value1;Key2=Value2").
-  const char* target;
-};
-
 /// \brief An active database connection.
 ///
 /// Provides methods for query execution, managing prepared
@@ -257,7 +249,7 @@ struct ADBC_EXPORT AdbcConnection {
   void* private_data;
   /// \brief The associated driver (used by the driver manager to help
   ///   track state).
-  AdbcDriver* private_driver;
+  struct AdbcDriver* private_driver;
 };
 
 /// \brief Allocate a new (but uninitialized) connection.
@@ -317,48 +309,143 @@ AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* con
 /// schema given in the function docstring. Schema fields are nullable unless
 /// otherwise marked.
 ///
-/// Some functions accept a "search pattern" argument, which is a string that
+/// Some functions accept "search pattern" arguments, which are strings that
 /// can contain the special character "%" to match zero or more characters, or
-/// "_" to match exactly one character. (See the documentation of
+/// "_" to match exactly one character.  (See the documentation of
 /// DatabaseMetaData in JDBC or "Pattern Value Arguments" in the ODBC
-/// documentation.)
-///
-/// TODO: escaping in search patterns?
+/// documentation.)  Escaping is not currently supported.
 ///
 /// @{
 
-/// \brief Get a list of catalogs in the database.
+/// \brief Get a hierarchical view of all catalogs, database schemas,
+///   tables, and columns.
 ///
 /// The result is an Arrow dataset with the following schema:
 ///
-/// Field Name     | Field Type
-/// ---------------|--------------
-/// catalog_name   | utf8 not null
+/// Field Name               | Field Type
+/// -------------------------|-----------------------
+/// catalog_name             | utf8
+/// catalog_db_schemas       | list<DB_SCHEMA_SCHEMA>
+///
+/// DB_SCHEMA_SCHEMA is a Struct with fields:
+///
+/// Field Name               | Field Type
+/// -------------------------|-----------------------
+/// db_schema_name           | utf8
+/// db_schema_tables         | list<TABLE_SCHEMA>
+///
+/// TABLE_SCHEMA is a Struct with fields:
+///
+/// Field Name               | Field Type
+/// -------------------------|-----------------------
+/// table_name               | utf8 not null
+/// table_type               | utf8 not null
+/// table_columns            | list<COLUMN_SCHEMA>
+/// table_constraints        | list<CONSTRAINT_SCHEMA>
+///
+/// COLUMN_SCHEMA is a Struct with fields:
+///
+/// Field Name               | Field Type            | Comments
+/// -------------------------|-----------------------|---------
+/// column_name              | utf8 not null         |
+/// ordinal_position         | int32                 | (1)
+/// remarks                  | utf8                  | (2)
+/// xdbc_data_type           | int16                 | (3)
+/// xdbc_type_name           | utf8                  | (3)
+/// xdbc_column_size         | int32                 | (3)
+/// xdbc_decimal_digits      | int16                 | (3)
+/// xdbc_num_prec_radix      | int16                 | (3)
+/// xdbc_nullable            | int16                 | (3)
+/// xdbc_column_def          | utf8                  | (3)
+/// xdbc_sql_data_type       | int16                 | (3)
+/// xdbc_datetime_sub        | int16                 | (3)
+/// xdbc_char_octet_length   | int32                 | (3)
+/// xdbc_is_nullable         | utf8                  | (3)
+/// xdbc_scope_catalog       | utf8                  | (3)
+/// xdbc_scope_schema        | utf8                  | (3)
+/// xdbc_scope_table         | utf8                  | (3)
+/// xdbc_is_autoincrement    | bool                  | (3)
+/// xdbc_is_generatedcolumn  | bool                  | (3)
+///
+/// 1. The column's ordinal position in the table (starting from 1).
+/// 2. Database-specific description of the column.
+/// 3. Optional, JDBC/ODBC-compatible value.
+///
+/// CONSTRAINT_SCHEMA is a Struct with fields:
+///
+/// Field Name               | Field Type            | Comments
+/// -------------------------|-----------------------|---------
+/// constraint_name          | utf8                  |
+/// constraint_type          | utf8 not null         | (1)
+/// constraint_column_names  | list<utf8> not null   | (2)
+/// constraint_column_usage  | list<USAGE_SCHEMA>    | (3)
+///
+/// 1. One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
+/// 2. The columns on the current table that are constrained, in
+///    order.
+/// 3. For FOREIGN KEY only, the referenced table and columns.
+///
+/// USAGE_SCHEMA is a Struct with fields:
+///
+/// Field Name               | Field Type            | Comments
+/// -------------------------|-----------------------|---------
+/// fk_catalog               | utf8                  |
+/// fk_db_schema             | utf8                  |
+/// fk_table                 | utf8 not null         |
+/// fk_column_name           | utf8 not null         |
 ///
 /// \param[in] connection The database connection.
-/// \param[out] statement The result set.
+/// \param[in] depth The level of nesting to display. If 0, display
+///   all levels. If 1, display only catalogs (i.e.  catalog_schemas
+///   will be null). If 2, display only catalogs and schemas
+///   (i.e. db_schema_tables will be null), and so on.
+/// \param[in] catalog Only show tables in the given catalog. If NULL,
+///   do not filter by catalog. If an empty string, only show tables
+///   without a catalog.  May be a search pattern (see section
+///   documentation).
+/// \param[in] db_schema Only show tables in the given database schema. If
+///   NULL, do not filter by database schema. If an empty string, only show
+///   tables without a database schema. May be a search pattern (see section
+///   documentation).
+/// \param[in] table_name Only show tables with the given name. If NULL, do not
+///   filter by name. May be a search pattern (see section documentation).
+/// \param[in] table_type Only show tables matching one of the given table
+///   types. If NULL, show tables of any type. Valid table types can be fetched
+///   from GetTableTypes.  Terminate the list with a NULL entry.
+/// \param[in] column_name Only show columns with the given name. If
+///   NULL, do not filter by name.  May be a search pattern (see
+///   section documentation).
+/// \param[out] statement The result set. AdbcStatementGetStream can
+///   be called immediately; do not call Execute or Prepare.
 /// \param[out] error Error details, if an error occurs.
 ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetCatalogs(struct AdbcConnection* connection,
-                                         struct AdbcStatement* statement,
-                                         struct AdbcError* error);
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+                                        const char* catalog, const char* db_schema,
+                                        const char* table_name, const char** table_type,
+                                        const char* column_name,
+                                        struct AdbcStatement* statement,
+                                        struct AdbcError* error);
 
-/// \brief Get a list of schemas in the database.
-///
-/// The result is an Arrow dataset with the following schema:
-///
-/// Field Name     | Field Type
-/// ---------------|--------------
-/// catalog_name   | utf8
-/// db_schema_name | utf8 not null
+#define ADBC_OBJECT_DEPTH_ALL 0
+#define ADBC_OBJECT_DEPTH_CATALOGS 1
+#define ADBC_OBJECT_DEPTH_DB_SCHEMAS 2
+#define ADBC_OBJECT_DEPTH_TABLES 3
+#define ADBC_OBJECT_DEPTH_COLUMNS ADBC_OBJECT_DEPTH_ALL
+
+/// \brief Get the Arrow schema of a table.
 ///
 /// \param[in] connection The database connection.
-/// \param[out] statement The result set.
+/// \param[in] catalog The catalog (or nullptr if not applicable).
+/// \param[in] db_schema The database schema (or nullptr if not applicable).
+/// \param[in] table_name The table name.
+/// \param[out] schema The table schema.
 /// \param[out] error Error details, if an error occurs.
 ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetDbSchemas(struct AdbcConnection* connection,
-                                          struct AdbcStatement* statement,
-                                          struct AdbcError* error);
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+                                            const char* catalog, const char* db_schema,
+                                            const char* table_name,
+                                            struct ArrowSchema* schema,
+                                            struct AdbcError* error);
 
 /// \brief Get a list of table types in the database.
 ///
@@ -376,38 +463,6 @@ AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
                                            struct AdbcStatement* statement,
                                            struct AdbcError* error);
 
-/// \brief Get a list of tables matching the given criteria.
-///
-/// The result is an Arrow dataset with the following schema:
-///
-/// Field Name     | Field Type
-/// ---------------|--------------
-/// catalog_name   | utf8
-/// db_schema_name | utf8
-/// table_name     | utf8 not null
-/// table_type     | utf8 not null
-///
-/// \param[in] connection The database connection.
-/// \param[in] catalog Only show tables in the given catalog. If NULL, do not
-///   filter by catalog. If an empty string, only show tables without a
-///   catalog.
-/// \param[in] db_schema Only show tables in the given database schema. If
-///   NULL, do not filter by database schema. If an empty string, only show
-///   tables without a database schema. May be a search pattern (see section
-///   documentation).
-/// \param[in] table_name Only show tables with the given name. If NULL, do not
-///   filter by name. May be a search pattern (see section documentation).
-/// \param[in] table_types Only show tables matching one of the given table
-///   types. If NULL, show tables of any type. Valid table types can be fetched
-///   from get_table_types.
-/// \param[out] statement The result set.
-/// \param[out] error Error details, if an error occurs.
-ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetTables(struct AdbcConnection* connection,
-                                       const char* catalog, const char* db_schema,
-                                       const char* table_name, const char** table_types,
-                                       struct AdbcStatement* statement,
-                                       struct AdbcError* error);
 /// }@
 
 /// }@
@@ -433,7 +488,7 @@ struct ADBC_EXPORT AdbcStatement {
 
   /// \brief The associated driver (used by the driver manager to help
   ///   track state).
-  AdbcDriver* private_driver;
+  struct AdbcDriver* private_driver;
 };
 
 /// \brief Create a new statement for a given connection.
@@ -670,15 +725,15 @@ struct ADBC_EXPORT AdbcDriver {
                                                        struct AdbcStatement*,
                                                        struct AdbcError*);
 
-  AdbcStatusCode (*ConnectionGetCatalogs)(struct AdbcConnection*, struct AdbcStatement*,
-                                          struct AdbcError*);
-  AdbcStatusCode (*ConnectionGetDbSchemas)(struct AdbcConnection*, struct AdbcStatement*,
-                                           struct AdbcError*);
+  AdbcStatusCode (*ConnectionGetObjects)(struct AdbcConnection*, int, const char*,
+                                         const char*, const char*, const char**,
+                                         const char*, struct AdbcStatement*,
+                                         struct AdbcError*);
+  AdbcStatusCode (*ConnectionGetTableSchema)(struct AdbcConnection*, const char*,
+                                             const char*, const char*,
+                                             struct ArrowSchema*, struct AdbcError*);
   AdbcStatusCode (*ConnectionGetTableTypes)(struct AdbcConnection*, struct AdbcStatement*,
                                             struct AdbcError*);
-  AdbcStatusCode (*ConnectionGetTables)(struct AdbcConnection*, const char*, const char*,
-                                        const char*, const char**, struct AdbcStatement*,
-                                        struct AdbcError*);
 
   AdbcStatusCode (*StatementNew)(struct AdbcConnection*, struct AdbcStatement*,
                                  struct AdbcError*);
@@ -723,7 +778,7 @@ typedef AdbcStatusCode (*AdbcDriverInitFunc)(size_t count, struct AdbcDriver* dr
 // struct/entrypoint instead?
 
 // For use with count
-#define ADBC_VERSION_0_0_1 25
+#define ADBC_VERSION_0_0_1 28
 
 /// }@
 
diff --git a/adbc_driver_manager/adbc_driver_manager.cc b/adbc_driver_manager/adbc_driver_manager.cc
index c273fb4..0572300 100644
--- a/adbc_driver_manager/adbc_driver_manager.cc
+++ b/adbc_driver_manager/adbc_driver_manager.cc
@@ -61,6 +61,22 @@ void SetError(struct AdbcError* error, const std::string& message) {
 }
 
 // Default stubs
+AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection*, struct AdbcStatement*,
+                                       struct AdbcError* error) {
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode ConnectionGetObjects(struct AdbcConnection*, int, const char*, const char*,
+                                    const char*, const char**, const char*,
+                                    struct AdbcStatement*, struct AdbcError* error) {
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, const char*,
+                                        const char*, struct ArrowSchema*,
+                                        struct AdbcError* error) {
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
 
 AdbcStatusCode StatementBind(struct AdbcStatement*, struct ArrowArray*,
                              struct ArrowSchema*, struct AdbcError* error) {
@@ -519,6 +535,10 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
   CHECK_REQUIRED(driver, DatabaseInit);
   CHECK_REQUIRED(driver, DatabaseRelease);
 
+  FILL_DEFAULT(driver, ConnectionGetObjects);
+  FILL_DEFAULT(driver, ConnectionGetTableSchema);
+  FILL_DEFAULT(driver, ConnectionGetTableTypes);
+
   FILL_DEFAULT(driver, StatementBind);
   FILL_DEFAULT(driver, StatementExecute);
   FILL_DEFAULT(driver, StatementPrepare);
diff --git a/drivers/flight_sql/flight_sql.cc b/drivers/flight_sql/flight_sql.cc
index c090ec0..e54f979 100644
--- a/drivers/flight_sql/flight_sql.cc
+++ b/drivers/flight_sql/flight_sql.cc
@@ -499,29 +499,24 @@ AdbcStatusCode FlightSqlStatementSetSqlQuery(struct AdbcStatement* statement,
 
 }  // namespace
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
   return FlightSqlDatabaseInit(database, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
   return FlightSqlDatabaseNew(database, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
                                      const char* value, struct AdbcError* error) {
   return FlightSqlDatabaseSetOption(database, key, value, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
                                    struct AdbcError* error) {
   return FlightSqlDatabaseRelease(database, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
                                                       const uint8_t* serialized_partition,
                                                       size_t serialized_length,
@@ -531,79 +526,67 @@ AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* con
                                                      serialized_length, statement, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
                                            struct AdbcStatement* statement,
                                            struct AdbcError* error) {
   return FlightSqlConnectionGetTableTypes(connection, statement, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
                                   struct AdbcError* error) {
   return FlightSqlConnectionInit(connection, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
                                  struct AdbcConnection* connection,
                                  struct AdbcError* error) {
   return FlightSqlConnectionNew(database, connection, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
                                        const char* value, struct AdbcError* error) {
   return FlightSqlConnectionSetOption(connection, key, value, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error) {
   return FlightSqlConnectionRelease(connection, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementExecute(struct AdbcStatement* statement,
                                     struct AdbcError* error) {
   return FlightSqlStatementExecute(statement, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
                                              uint8_t* partition_desc,
                                              struct AdbcError* error) {
   return FlightSqlStatementGetPartitionDesc(statement, partition_desc, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
                                                  size_t* length,
                                                  struct AdbcError* error) {
   return FlightSqlStatementGetPartitionDescSize(statement, length, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementGetStream(struct AdbcStatement* statement,
                                       struct ArrowArrayStream* out,
                                       struct AdbcError* error) {
   return FlightSqlStatementGetStream(statement, out, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
                                 struct AdbcStatement* statement,
                                 struct AdbcError* error) {
   return FlightSqlStatementNew(connection, statement, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
                                     struct AdbcError* error) {
   return FlightSqlStatementRelease(statement, error);
 }
 
-ADBC_DRIVER_EXPORT
 AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
                                         const char* query, struct AdbcError* error) {
   return FlightSqlStatementSetSqlQuery(statement, query, error);
diff --git a/drivers/sqlite/sqlite.cc b/drivers/sqlite/sqlite.cc
index 2973794..74f6cd2 100644
--- a/drivers/sqlite/sqlite.cc
+++ b/drivers/sqlite/sqlite.cc
@@ -22,6 +22,7 @@
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
 
 #include <arrow/builder.h>
 #include <arrow/c/bridge.h>
@@ -76,6 +77,15 @@ void SetError(struct AdbcError* error, Args&&... args) {
   error->release = ReleaseError;
 }
 
+AdbcStatusCode CheckRc(sqlite3* db, int rc, const char* context,
+                       struct AdbcError* error) {
+  if (rc != SQLITE_OK) {
+    SetError(db, context, error);
+    return ADBC_STATUS_IO;
+  }
+  return ADBC_STATUS_OK;
+}
+
 AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* context,
                        struct AdbcError* error) {
   if (rc != SQLITE_OK) {
@@ -89,6 +99,29 @@ AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* cont
   return ADBC_STATUS_OK;
 }
 
+template <typename CallbackFn>
+AdbcStatusCode DoQuery(sqlite3* db, const char* query, struct AdbcError* error,
+                       CallbackFn&& callback) {
+  sqlite3_stmt* stmt;
+  int rc = sqlite3_prepare_v2(db, query, std::strlen(query), &stmt, /*pzTail=*/nullptr);
+  auto status = std::move(callback)(stmt);
+  std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
+  return status;
+}
+
+arrow::Status ToArrowStatus(AdbcStatusCode code, struct AdbcError* error) {
+  if (code == ADBC_STATUS_OK) return Status::OK();
+  // TODO:
+  return Status::UnknownError(code);
+}
+
+AdbcStatusCode FromArrowStatus(const Status& status, struct AdbcError* error) {
+  if (status.ok()) return ADBC_STATUS_OK;
+  SetError(error, status);
+  // TODO: map Arrow codes to ADBC codes
+  return ADBC_STATUS_INTERNAL;
+}
+
 std::shared_ptr<arrow::Schema> StatementToSchema(sqlite3_stmt* stmt) {
   const int num_columns = sqlite3_column_count(stmt);
   arrow::FieldVector fields(num_columns);
@@ -192,12 +225,41 @@ class SqliteConnectionImpl {
   explicit SqliteConnectionImpl(std::shared_ptr<SqliteDatabaseImpl> database)
       : database_(std::move(database)), db_(nullptr) {}
 
-  //----------------------------------------------------------
-  // Common Functions
-  //----------------------------------------------------------
-
   sqlite3* db() const { return db_; }
 
+  AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
+                                const char* table_name, struct ArrowSchema* schema,
+                                struct AdbcError* error) {
+    if ((catalog && std::strlen(catalog) > 0) ||
+        (db_schema && std::strlen(db_schema) > 0)) {
+      std::memset(schema, 0, sizeof(*schema));
+      SetError(error, "Catalog/schema are not supported");
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    std::string query = "SELECT * FROM ";
+    char* escaped = sqlite3_mprintf("%w", table_name);
+    if (!escaped) {
+      // Failed to allocate
+      SetError(error, "Could not escape table name (failed to allocate memory)");
+      return ADBC_STATUS_INTERNAL;
+    }
+    query += escaped;
+
+    sqlite3_stmt* stmt;
+    int rc = sqlite3_prepare_v2(db_, query.c_str(), static_cast<int>(query.size()), &stmt,
+                                /*pzTail=*/nullptr);
+    ADBC_RETURN_NOT_OK(CheckRc(db_, stmt, rc, "sqlite3_prepare_v2", error));
+    rc = sqlite3_step(stmt);
+    if (rc == SQLITE_ERROR) {
+      return CheckRc(db_, stmt, rc, "sqlite3_step", error);
+    }
+    auto arrow_schema = StatementToSchema(stmt);
+    ADBC_RETURN_NOT_OK(
+        CheckRc(db_, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error));
+    return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
+  }
+
   AdbcStatusCode Init(struct AdbcError* error) {
     db_ = database_->Connect();
     if (!db_) {
@@ -214,18 +276,80 @@ class SqliteConnectionImpl {
   sqlite3* db_;
 };
 
-class SqliteStatementImpl : public arrow::RecordBatchReader {
+AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
+                              int64_t row, int* rc, struct AdbcError* error) {
+  int col_index = 1;
+  for (const auto& column : data.columns()) {
+    if (column->IsNull(row)) {
+      *rc = sqlite3_bind_null(stmt, col_index);
+    } else {
+      switch (column->type()->id()) {
+        case arrow::Type::INT64: {
+          *rc = sqlite3_bind_int64(
+              stmt, col_index, static_cast<const arrow::Int64Array&>(*column).Value(row));
+          break;
+        }
+        case arrow::Type::STRING: {
+          const auto& strings = static_cast<const arrow::StringArray&>(*column);
+          *rc =
+              sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
+                                  strings.value_length(row), SQLITE_STATIC, SQLITE_UTF8);
+          break;
+        }
+        default:
+          SetError(error, "Binding parameter of type ", *column->type());
+          return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+    }
+    if (*rc != SQLITE_OK) return ADBC_STATUS_IO;
+    col_index++;
+  }
+  return ADBC_STATUS_OK;
+}
+
+class SqliteStatementReader : public arrow::RecordBatchReader {
  public:
-  SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
+  explicit SqliteStatementReader(
+      std::shared_ptr<SqliteConnectionImpl> connection, sqlite3_stmt* stmt,
+      std::shared_ptr<arrow::RecordBatchReader> bind_parameters)
       : connection_(std::move(connection)),
-        stmt_(nullptr),
+        stmt_(stmt),
+        bind_parameters_(std::move(bind_parameters)),
         schema_(nullptr),
+        next_parameters_(nullptr),
         bind_index_(0),
         done_(false) {}
 
-  //----------------------------------------------------------
-  // arrow::RecordBatchReader
-  //----------------------------------------------------------
+  AdbcStatusCode Init(struct AdbcError* error) {
+    // Step the statement and get the schema (SQLite doesn't
+    // necessarily know the schema until it begins to execute it)
+
+    sqlite3* db = connection_->db();
+    Status status;
+    int rc = SQLITE_OK;
+    if (bind_parameters_) {
+      status = bind_parameters_->ReadNext(&next_parameters_);
+      ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+      ADBC_RETURN_NOT_OK(BindNext(&rc, error));
+      ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_bind", error));
+    }
+    // XXX: with parameters, inferring the schema from the first
+    // argument is inaccurate (what if one is null?). Is there a way
+    // to hint to SQLite the real type?
+
+    rc = sqlite3_step(stmt_);
+    if (rc == SQLITE_ERROR) {
+      return CheckRc(db, stmt_, rc, "sqlite3_step", error);
+    }
+    schema_ = StatementToSchema(stmt_);
+    done_ = rc != SQLITE_ROW;
+    return ADBC_STATUS_OK;
+  }
+
+  void OverrideSchema(std::shared_ptr<arrow::Schema> schema) {
+    // TODO(ARROW-14705): use UnifySchemas for some sanity checking
+    schema_ = std::move(schema);
+  }
 
   std::shared_ptr<arrow::Schema> schema() const override {
     DCHECK(schema_);
@@ -255,6 +379,7 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
         const auto& field = schema_->field(col);
         switch (field->type()->id()) {
           case arrow::Type::INT64: {
+            // TODO: handle null values
             const sqlite3_int64 value = sqlite3_column_int64(stmt_, col);
             ARROW_RETURN_NOT_OK(
                 dynamic_cast<arrow::Int64Builder*>(builders[col].get())->Append(value));
@@ -264,6 +389,7 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
             const char* value =
                 reinterpret_cast<const char*>(sqlite3_column_text(stmt_, col));
             if (!value) {
+              // TODO: check field nullability
               ARROW_RETURN_NOT_OK(
                   dynamic_cast<arrow::StringBuilder*>(builders[col].get())->AppendNull());
             } else {
@@ -295,7 +421,8 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
           if (status != SQLITE_OK) {
             return Status::IOError("[SQLite3] sqlite3_reset: ", sqlite3_errmsg(db));
           }
-          RETURN_NOT_OK(BindNext());
+          struct AdbcError error;
+          ARROW_RETURN_NOT_OK(ToArrowStatus(BindNext(&status, &error), &error));
           status = sqlite3_step(stmt_);
           if (status == SQLITE_ROW) continue;
         } else {
@@ -315,27 +442,39 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
     return Status::OK();
   }
 
-  //----------------------------------------------------------
-  // Common Functions
-  //----------------------------------------------------------
+ private:
+  AdbcStatusCode BindNext(int* rc, struct AdbcError* error) {
+    if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
+      return ADBC_STATUS_OK;
+    }
+    return BindParameters(stmt_, *next_parameters_, bind_index_++, rc, error);
+  }
+
+  std::shared_ptr<SqliteConnectionImpl> connection_;
+  sqlite3_stmt* stmt_;
+  std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
+
+  std::shared_ptr<arrow::Schema> schema_;
+  std::shared_ptr<arrow::RecordBatch> next_parameters_;
+  int64_t bind_index_;
+  bool done_;
+};
+
+class SqliteStatementImpl {
+ public:
+  explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
+      : connection_(std::move(connection)), stmt_(nullptr) {}
 
   AdbcStatusCode Close(struct AdbcError* error) {
     if (stmt_) {
       const int rc = sqlite3_finalize(stmt_);
       stmt_ = nullptr;
-      next_parameters_.reset();
-      bind_parameters_.reset();
       ADBC_RETURN_NOT_OK(
           CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
-      connection_.reset();
     }
     return ADBC_STATUS_OK;
   }
 
-  //----------------------------------------------------------
-  // Statement Functions
-  //----------------------------------------------------------
-
   AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
                       struct ArrowArray* values, struct ArrowSchema* schema,
                       struct AdbcError* error) {
@@ -369,23 +508,427 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
 
   AdbcStatusCode Execute(const std::shared_ptr<SqliteStatementImpl>& self,
                          struct AdbcError* error) {
-    if (bulk_table_.empty()) {
+    if (stmt_) {
       return ExecutePrepared(error);
+    } else if (!bulk_table_.empty()) {
+      return ExecuteBulk(error);
     }
-    return ExecuteBulk(error);
+    SetError(error, "Cannot execute a statement without a query");
+    return ADBC_STATUS_UNINITIALIZED;
+  }
+
+  AdbcStatusCode GetObjects(const std::shared_ptr<SqliteStatementImpl>& self, int depth,
+                            const char* catalog, const char* db_schema,
+                            const char* table_name, const char** table_type,
+                            const char* column_name, struct AdbcError* error) {
+    static std::shared_ptr<arrow::DataType> kColumnSchema = arrow::struct_({
+        arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
+        arrow::field("ordinal_position", arrow::int32()),
+        arrow::field("remarks", arrow::utf8()),
+        arrow::field("xdbc_data_type", arrow::int16()),
+        arrow::field("xdbc_type_name", arrow::utf8()),
+        arrow::field("xdbc_column_size", arrow::int32()),
+        arrow::field("xdbc_decimal_digits", arrow::int16()),
+        arrow::field("xdbc_num_prec_radix", arrow::int16()),
+        arrow::field("xdbc_nullable", arrow::int16()),
+        arrow::field("xdbc_column_def", arrow::utf8()),
+        arrow::field("xdbc_sql_data_type", arrow::int16()),
+        arrow::field("xdbc_datetime_sub", arrow::int16()),
+        arrow::field("xdbc_char_octet_length", arrow::int32()),
+        arrow::field("xdbc_is_nullable", arrow::utf8()),
+        arrow::field("xdbc_scope_catalog", arrow::utf8()),
+        arrow::field("xdbc_scope_schema", arrow::utf8()),
+        arrow::field("xdbc_scope_table", arrow::utf8()),
+        arrow::field("xdbc_is_autoincrement", arrow::boolean()),
+        arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
+    });
+    static std::shared_ptr<arrow::DataType> kUsageSchema = arrow::struct_({
+        arrow::field("fk_catalog", arrow::utf8()),
+        arrow::field("fk_db_schema", arrow::utf8()),
+        arrow::field("fk_table", arrow::utf8()),
+        arrow::field("fk_column_name", arrow::utf8()),
+    });
+    static std::shared_ptr<arrow::DataType> kConstraintSchema = arrow::struct_({
+        arrow::field("constraint_name", arrow::utf8()),
+        arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
+        arrow::field("column_names", arrow::list(arrow::utf8()), /*nullable=*/false),
+        arrow::field("column_names", arrow::list(kUsageSchema)),
+    });
+    static std::shared_ptr<arrow::DataType> kTableSchema = arrow::struct_({
+        arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
+        arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+        arrow::field("table_columns", arrow::list(kColumnSchema)),
+        arrow::field("table_constraints", arrow::list(kConstraintSchema)),
+    });
+    static std::shared_ptr<arrow::DataType> kDbSchemaSchema = arrow::struct_({
+        arrow::field("db_schema_name", arrow::utf8()),
+        arrow::field("db_schema_tables", arrow::list(kTableSchema)),
+    });
+    static std::shared_ptr<arrow::Schema> kCatalogSchema = arrow::schema({
+        arrow::field("catalog_name", arrow::utf8()),
+        arrow::field("catalog_db_schemas", arrow::list(kDbSchemaSchema)),
+    });
+
+    static const char kTableQuery[] =
+        R"(SELECT name, type
+           FROM sqlite_master
+           WHERE name LIKE ? AND type <> "index"
+           ORDER BY name ASC)";
+    static const char kColumnQuery[] =
+        R"(SELECT cid, name
+           FROM pragma_table_info(?)
+           WHERE name LIKE ?
+           ORDER BY cid ASC)";
+    static const char kPrimaryKeyQuery[] =
+        R"(SELECT name
+           FROM pragma_table_info(?)
+           WHERE pk > 0
+           ORDER BY pk ASC)";
+    static const char kForeignKeyQuery[] =
+        R"(SELECT id, seq, "table", "from", "to"
+           FROM pragma_foreign_key_list(?)
+           ORDER BY id, seq ASC)";
+
+    arrow::StringBuilder catalog_name;
+    std::unique_ptr<arrow::ArrayBuilder> catalog_schemas_builder;
+    ADBC_RETURN_NOT_OK(FromArrowStatus(
+        MakeBuilder(arrow::default_memory_pool(), kCatalogSchema->field(1)->type(),
+                    &catalog_schemas_builder),
+        error));
+    auto* catalog_schemas =
+        static_cast<arrow::ListBuilder*>(catalog_schemas_builder.get());
+    auto* catalog_schemas_items =
+        static_cast<arrow::StructBuilder*>(catalog_schemas->value_builder());
+    auto* db_schema_name =
+        static_cast<arrow::StringBuilder*>(catalog_schemas_items->child_builder(0).get());
+    auto* db_schema_tables =
+        static_cast<arrow::ListBuilder*>(catalog_schemas_items->child_builder(1).get());
+    auto* db_schema_tables_items =
+        static_cast<arrow::StructBuilder*>(db_schema_tables->value_builder());
+    auto* table_names = static_cast<arrow::StringBuilder*>(
+        db_schema_tables_items->child_builder(0).get());
+    auto* table_types = static_cast<arrow::StringBuilder*>(
+        db_schema_tables_items->child_builder(1).get());
+    auto* table_columns =
+        static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(2).get());
+    auto* table_columns_items =
+        static_cast<arrow::StructBuilder*>(table_columns->value_builder());
+    auto* column_names =
+        static_cast<arrow::StringBuilder*>(table_columns_items->child_builder(0).get());
+    auto* ordinal_positions =
+        static_cast<arrow::Int32Builder*>(table_columns_items->child_builder(1).get());
+    auto* table_constraints =
+        static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(3).get());
+    auto* table_constraints_items =
+        static_cast<arrow::StructBuilder*>(table_constraints->value_builder());
+    auto* constraint_names = static_cast<arrow::StringBuilder*>(
+        table_constraints_items->child_builder(0).get());
+    auto* constraint_types = static_cast<arrow::StringBuilder*>(
+        table_constraints_items->child_builder(1).get());
+    auto* constraint_column_names =
+        static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(2).get());
+    auto* constraint_column_names_items =
+        static_cast<arrow::StringBuilder*>(constraint_column_names->value_builder());
+    auto* constraint_column_usage =
+        static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(3).get());
+    auto* constraint_column_usage_items =
+        static_cast<arrow::StructBuilder*>(constraint_column_usage->value_builder());
+    auto* constraint_column_usage_fk_catalog = static_cast<arrow::StringBuilder*>(
+        constraint_column_usage_items->child_builder(0).get());
+    auto* constraint_column_usage_fk_db_schema = static_cast<arrow::StringBuilder*>(
+        constraint_column_usage_items->child_builder(1).get());
+    auto* constraint_column_usage_fk_table = static_cast<arrow::StringBuilder*>(
+        constraint_column_usage_items->child_builder(2).get());
+    auto* constraint_column_usage_fk_column_name = static_cast<arrow::StringBuilder*>(
+        constraint_column_usage_items->child_builder(3).get());
+
+    if (!catalog || std::strlen(catalog) == 0) {
+      ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.AppendNull(), error));
+
+      if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
+        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->AppendNull(), error));
+      } else if (!db_schema || std::strlen(db_schema) == 0) {
+        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
+        ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_name->AppendNull(), error));
+        if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
+          ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->AppendNull(), error));
+        } else {
+          // Look up tables
+
+          std::unordered_set<std::string> table_type_filter;
+          if (table_type) {
+            while (*table_type) {
+              table_type_filter.insert(*table_type);
+              table_type++;
+            }
+          }
+
+          sqlite3* db = connection_->db();
+          ADBC_RETURN_NOT_OK(
+              DoQuery(db, kTableQuery, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+                if (table_name) {
+                  ADBC_RETURN_NOT_OK(CheckRc(
+                      db,
+                      sqlite3_bind_text64(stmt, 1, table_name, std::strlen(table_name),
+                                          SQLITE_STATIC, SQLITE_UTF8),
+                      "sqlite3_bind_text64", error));
+                } else {
+                  ADBC_RETURN_NOT_OK(CheckRc(
+                      db,
+                      sqlite3_bind_text64(stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8),
+                      "sqlite3_bind_text64", error));
+                }
+
+                int rc = SQLITE_OK;
+                ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->Append(), error));
+                while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+                  const char* cur_table =
+                      reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
+                  const char* cur_table_type =
+                      reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
+
+                  if (!table_type_filter.empty() &&
+                      table_type_filter.find(cur_table_type) == table_type_filter.end()) {
+                    continue;
+                  }
+
+                  ADBC_RETURN_NOT_OK(
+                      FromArrowStatus(table_names->Append(cur_table), error));
+                  ADBC_RETURN_NOT_OK(
+                      FromArrowStatus(table_types->Append(cur_table_type), error));
+                  if (depth == ADBC_OBJECT_DEPTH_TABLES) {
+                    ADBC_RETURN_NOT_OK(
+                        FromArrowStatus(table_columns->AppendNull(), error));
+                    ADBC_RETURN_NOT_OK(
+                        FromArrowStatus(table_constraints->AppendNull(), error));
+                  } else {
+                    ADBC_RETURN_NOT_OK(FromArrowStatus(table_columns->Append(), error));
+                    ADBC_RETURN_NOT_OK(DoQuery(
+                        db, kColumnQuery, error,
+                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+                          ADBC_RETURN_NOT_OK(
+                              CheckRc(db,
+                                      sqlite3_bind_text64(stmt, 1, cur_table,
+                                                          std::strlen(cur_table),
+                                                          SQLITE_STATIC, SQLITE_UTF8),
+                                      "sqlite3_bind_text64", error));
+
+                          if (column_name) {
+                            ADBC_RETURN_NOT_OK(
+                                CheckRc(db,
+                                        sqlite3_bind_text64(stmt, 2, column_name,
+                                                            std::strlen(column_name),
+                                                            SQLITE_STATIC, SQLITE_UTF8),
+                                        "sqlite3_bind_text64", error));
+                          } else {
+                            ADBC_RETURN_NOT_OK(
+                                CheckRc(db,
+                                        sqlite3_bind_text64(stmt, 2, "%", 1,
+                                                            SQLITE_STATIC, SQLITE_UTF8),
+                                        "sqlite3_bind_text64", error));
+                          }
+
+                          int rc = SQLITE_OK;
+                          int64_t row_count = 0;
+                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+                            row_count++;
+                            const int32_t cur_ordinal_position =
+                                1 + sqlite3_column_int(stmt, 0);
+                            const char* cur_column_name = reinterpret_cast<const char*>(
+                                sqlite3_column_text(stmt, 1));
+
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                column_names->Append(cur_column_name), error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                ordinal_positions->Append(cur_ordinal_position), error));
+
+                            ADBC_RETURN_NOT_OK(
+                                FromArrowStatus(table_columns_items->Append(), error));
+                          }
+                          for (int i = 2; i < table_columns_items->num_children(); i++) {
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                table_columns_items->child_builder(i)->AppendNulls(
+                                    row_count),
+                                error));
+                          }
+                          if (rc != SQLITE_DONE) {
+                            return CheckRc(db, rc, "sqlite3_step", error);
+                          }
+                          return ADBC_STATUS_OK;
+                        }));
+
+                    // We can get primary key and foreign keys, but not unique (without
+                    // parsing the table definition, at least)
+                    ADBC_RETURN_NOT_OK(
+                        FromArrowStatus(table_constraints->Append(), error));
+                    ADBC_RETURN_NOT_OK(DoQuery(
+                        db, kPrimaryKeyQuery, error,
+                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+                          ADBC_RETURN_NOT_OK(
+                              CheckRc(db,
+                                      sqlite3_bind_text64(stmt, 1, cur_table,
+                                                          std::strlen(cur_table),
+                                                          SQLITE_STATIC, SQLITE_UTF8),
+                                      "sqlite3_bind_text64", error));
+
+                          int rc = SQLITE_OK;
+                          bool has_primary_key = false;
+                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+                            if (!has_primary_key) {
+                              ADBC_RETURN_NOT_OK(
+                                  FromArrowStatus(constraint_names->AppendNull(), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_types->Append("PRIMARY KEY"), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_column_names->Append(), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_column_usage->Append(), error));
+                            }
+                            has_primary_key = true;
+                            const char* cur_column_name = reinterpret_cast<const char*>(
+                                sqlite3_column_text(stmt, 0));
+
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_names_items->Append(cur_column_name),
+                                error));
+                          }
+                          if (has_primary_key) {
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                table_constraints_items->Append(), error));
+                          }
+                          if (rc != SQLITE_DONE) {
+                            return CheckRc(db, rc, "sqlite3_step", error);
+                          }
+                          return ADBC_STATUS_OK;
+                        }));
+                    ADBC_RETURN_NOT_OK(DoQuery(
+                        db, kForeignKeyQuery, error,
+                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+                          ADBC_RETURN_NOT_OK(
+                              CheckRc(db,
+                                      sqlite3_bind_text64(stmt, 1, cur_table,
+                                                          std::strlen(cur_table),
+                                                          SQLITE_STATIC, SQLITE_UTF8),
+                                      "sqlite3_bind_text64", error));
+
+                          int rc = SQLITE_OK;
+                          int prev_key_id = -1;
+                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+                            const int key_id = sqlite3_column_int(stmt, 0);
+                            const int key_seq = sqlite3_column_int(stmt, 1);
+                            const char* to_table = reinterpret_cast<const char*>(
+                                sqlite3_column_text(stmt, 2));
+                            const char* from_col = reinterpret_cast<const char*>(
+                                sqlite3_column_text(stmt, 3));
+                            const char* to_col = reinterpret_cast<const char*>(
+                                sqlite3_column_text(stmt, 4));
+                            if (key_id != prev_key_id) {
+                              ADBC_RETURN_NOT_OK(
+                                  FromArrowStatus(constraint_names->AppendNull(), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_types->Append("FOREIGN KEY"), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_column_names->Append(), error));
+                              ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                  constraint_column_usage->Append(), error));
+                              if (prev_key_id != -1) {
+                                ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                    table_constraints_items->Append(), error));
+                              }
+                            }
+                            prev_key_id = key_id;
+
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_names_items->Append(from_col), error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_usage_fk_catalog->AppendNull(), error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_usage_fk_db_schema->AppendNull(),
+                                error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_usage_fk_table->Append(to_table),
+                                error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_usage_fk_column_name->Append(to_col),
+                                error));
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                constraint_column_usage_items->Append(), error));
+                          }
+                          if (prev_key_id != -1) {
+                            ADBC_RETURN_NOT_OK(FromArrowStatus(
+                                table_constraints_items->Append(), error));
+                          }
+                          if (rc != SQLITE_DONE) {
+                            return CheckRc(db, rc, "sqlite3_step", error);
+                          }
+                          return ADBC_STATUS_OK;
+                        }));
+                  }
+
+                  ADBC_RETURN_NOT_OK(
+                      FromArrowStatus(db_schema_tables_items->Append(), error));
+                }
+                if (rc != SQLITE_DONE) {
+                  return CheckRc(db, rc, "sqlite3_step", error);
+                }
+                return ADBC_STATUS_OK;
+              }));
+        }
+        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas_items->Append(), error));
+      } else {
+        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
+      }
+    }
+
+    arrow::ArrayVector arrays(2);
+    ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Finish(&arrays[0]), error));
+    ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Finish(&arrays[1]), error));
+    const int64_t rows = arrays[0]->length();
+    auto status =
+        arrow::RecordBatchReader::Make(
+            {
+                arrow::RecordBatch::Make(kCatalogSchema, rows, std::move(arrays)),
+            },
+            kCatalogSchema)
+            .Value(&result_reader_);
+    ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+    return ADBC_STATUS_OK;
+  }
+
+  AdbcStatusCode GetTableTypes(const std::shared_ptr<SqliteStatementImpl>& self,
+                               struct AdbcError* error) {
+    auto schema =
+        arrow::schema({arrow::field("table_type", arrow::utf8(), /*nullable=*/false)});
+
+    arrow::StringBuilder builder;
+    std::shared_ptr<arrow::Array> array;
+    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("table"), error));
+    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("view"), error));
+    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Finish(&array), error));
+
+    auto status =
+        arrow::RecordBatchReader::Make(
+            {
+                arrow::RecordBatch::Make(schema, /*num_rows=*/2, {std::move(array)}),
+            },
+            schema)
+            .Value(&result_reader_);
+    ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+    return ADBC_STATUS_OK;
   }
 
   AdbcStatusCode GetStream(const std::shared_ptr<SqliteStatementImpl>& self,
                            struct ArrowArrayStream* out, struct AdbcError* error) {
-    if (!stmt_ || !schema_) {
+    if (!result_reader_) {
       SetError(error, "Statement has not yet been executed");
       return ADBC_STATUS_UNINITIALIZED;
     }
-    auto status = arrow::ExportRecordBatchReader(self, out);
+    auto status = arrow::ExportRecordBatchReader(result_reader_, out);
     if (!status.ok()) {
       SetError(error, "Could not initialize result reader: ", status);
-      return ADBC_STATUS_UNKNOWN;
+      return ADBC_STATUS_INTERNAL;
     }
+    result_reader_.reset();
     return ADBC_STATUS_OK;
   }
 
@@ -393,13 +936,12 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
                            const char* key, const char* value, struct AdbcError* error) {
     if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
       // Bulk ingest
+
+      // Clear previous statement, if any
+      ADBC_RETURN_NOT_OK(Close(error));
+
       if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
       bulk_table_ = value;
-      if (stmt_) {
-        int rc = sqlite3_finalize(stmt_);
-        ADBC_RETURN_NOT_OK(
-            CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
-      }
       return ADBC_STATUS_OK;
     }
     SetError(error, "Unknown option: ", key);
@@ -409,6 +951,9 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
   AdbcStatusCode SetSqlQuery(const std::shared_ptr<SqliteStatementImpl>& self,
                              const char* query, struct AdbcError* error) {
     bulk_table_.clear();
+    // Clear previous statement, if any
+    ADBC_RETURN_NOT_OK(Close(error));
+
     sqlite3* db = connection_->db();
     int rc = sqlite3_prepare_v2(db, query, static_cast<int>(std::strlen(query)), &stmt_,
                                 /*pzTail=*/nullptr);
@@ -416,48 +961,6 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
   }
 
  private:
-  arrow::Result<int> BindNext() {
-    if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
-      return SQLITE_OK;
-    }
-
-    return BindImpl(stmt_, *next_parameters_, bind_index_++);
-  }
-
-  arrow::Result<int> BindImpl(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
-                              int64_t row) {
-    int col_index = 1;
-    for (const auto& column : data.columns()) {
-      if (column->IsNull(row)) {
-        const int rc = sqlite3_bind_null(stmt, col_index);
-        if (rc != SQLITE_OK) return rc;
-      } else {
-        switch (column->type()->id()) {
-          case arrow::Type::INT64: {
-            const int rc = sqlite3_bind_int64(
-                stmt, col_index,
-                static_cast<const arrow::Int64Array&>(*column).Value(row));
-            if (rc != SQLITE_OK) return rc;
-            break;
-          }
-          case arrow::Type::STRING: {
-            const auto& strings = static_cast<const arrow::StringArray&>(*column);
-            const int rc = sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
-                                               strings.value_length(row), SQLITE_STATIC,
-                                               SQLITE_UTF8);
-            if (rc != SQLITE_OK) return rc;
-            break;
-          }
-          default:
-            return arrow::Status::NotImplemented("Binding parameter of type ",
-                                                 *column->type());
-        }
-      }
-      col_index++;
-    }
-    return SQLITE_OK;
-  }
-
   AdbcStatusCode ExecuteBulk(struct AdbcError* error) {
     if (!bind_parameters_) {
       SetError(error, "Must AdbcStatementBind for bulk insertion");
@@ -531,8 +1034,8 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
       if (!batch) break;
 
       for (int64_t row = 0; row < batch->num_rows(); row++) {
-        status = BindImpl(stmt, *batch, row).Value(&rc);
-        ADBC_RETURN_NOT_OK(check_status(status));
+        // TODO: if this fails we won't release the statement
+        ADBC_RETURN_NOT_OK(BindParameters(stmt, *batch, row, &rc, error));
         ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_bind", error));
 
         rc = sqlite3_step(stmt);
@@ -554,51 +1057,31 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
 
   AdbcStatusCode ExecutePrepared(struct AdbcError* error) {
     sqlite3* db = connection_->db();
-    int rc = SQLITE_OK;
-    if (schema_) {
-      rc = sqlite3_clear_bindings(stmt_);
-      ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
-
-      rc = sqlite3_reset(stmt_);
-      ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
-    }
-    // Step the statement and get the schema (SQLite doesn't
-    // necessarily know the schema until it begins to execute it)
-
-    Status status;
-    if (bind_parameters_) {
-      status = bind_parameters_->ReadNext(&next_parameters_);
-      if (status.ok()) status = BindNext().Value(&rc);
-    }
-    // XXX: with parameters, inferring the schema from the first
-    // argument is inaccurate (what if one is null?). Is there a way
-    // to hint to SQLite the real type?
-
-    if (!status.ok()) {
-      // TODO: map Arrow codes to ADBC codes
-      SetError(error, status);
-      return ADBC_STATUS_IO;
-    }
-    ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_bind", error));
-
-    rc = sqlite3_step(stmt_);
-    if (rc == SQLITE_ERROR) {
-      return CheckRc(db, stmt_, rc, "sqlite3_error", error);
-    }
-    schema_ = StatementToSchema(stmt_);
-    done_ = rc != SQLITE_ROW;
+    int rc = sqlite3_clear_bindings(stmt_);
+    ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
+
+    rc = sqlite3_reset(stmt_);
+    ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
+    auto reader = std::make_shared<SqliteStatementReader>(connection_, stmt_,
+                                                          std::move(bind_parameters_));
+    ADBC_RETURN_NOT_OK(reader->Init(error));
+    result_reader_ = std::move(reader);
     return ADBC_STATUS_OK;
   }
 
   std::shared_ptr<SqliteConnectionImpl> connection_;
+
+  // Query state
+
+  // Bulk ingestion
   // Target of bulk ingestion (rather janky to store state like this, though…)
   std::string bulk_table_;
+
+  // Prepared statements
   sqlite3_stmt* stmt_;
-  std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
-  std::shared_ptr<arrow::RecordBatch> next_parameters_;
-  int64_t bind_index_;
-  bool done_;
+
+  std::shared_ptr<arrow::RecordBatchReader> result_reader_;
 };
 
 // ADBC interface implementation - as private functions so that these
@@ -649,20 +1132,35 @@ AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
   return status;
 }
 
-AdbcStatusCode SqliteConnectionNew(struct AdbcDatabase* database,
-                                   struct AdbcConnection* connection,
-                                   struct AdbcError* error) {
+AdbcStatusCode SqliteConnectionGetObjects(
+    struct AdbcConnection* connection, int depth, const char* catalog,
+    const char* db_schema, const char* table_name, const char** table_types,
+    const char* column_name, struct AdbcStatement* statement, struct AdbcError* error) {
+  if (!statement->private_data) return ADBC_STATUS_UNINITIALIZED;
   auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
-  auto impl = std::make_shared<SqliteConnectionImpl>(*ptr);
-  connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
-  return ADBC_STATUS_OK;
+      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
+  return (*ptr)->GetObjects(*ptr, depth, catalog, db_schema, table_name, table_types,
+                            column_name, error);
 }
 
-AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
-                                         const char* key, const char* value,
-                                         struct AdbcError* error) {
-  return ADBC_STATUS_OK;
+AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
+                                              const char* catalog, const char* db_schema,
+                                              const char* table_name,
+                                              struct ArrowSchema* schema,
+                                              struct AdbcError* error) {
+  if (!connection->private_data) return ADBC_STATUS_UNINITIALIZED;
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+  return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
+                                             struct AdbcStatement* statement,
+                                             struct AdbcError* error) {
+  if (!statement->private_data) return ADBC_STATUS_UNINITIALIZED;
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
+  return (*ptr)->GetTableTypes(*ptr, error);
 }
 
 AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
@@ -673,6 +1171,16 @@ AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
   return (*ptr)->Init(error);
 }
 
+AdbcStatusCode SqliteConnectionNew(struct AdbcDatabase* database,
+                                   struct AdbcConnection* connection,
+                                   struct AdbcError* error) {
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
+  auto impl = std::make_shared<SqliteConnectionImpl>(*ptr);
+  connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
+  return ADBC_STATUS_OK;
+}
+
 AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
                                        struct AdbcError* error) {
   if (!connection->private_data) return ADBC_STATUS_UNINITIALIZED;
@@ -684,6 +1192,12 @@ AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
   return status;
 }
 
+AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
+                                         const char* key, const char* value,
+                                         struct AdbcError* error) {
+  return ADBC_STATUS_OK;
+}
+
 AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
                                    struct ArrowArray* values, struct ArrowSchema* schema,
                                    struct AdbcError* error) {
@@ -795,15 +1309,29 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
   return SqliteDatabaseRelease(database, error);
 }
 
-AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
-                                 struct AdbcConnection* connection,
-                                 struct AdbcError* error) {
-  return SqliteConnectionNew(database, connection, error);
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+                                        const char* catalog, const char* db_schema,
+                                        const char* table_name, const char** table_types,
+                                        const char* column_name,
+                                        struct AdbcStatement* statement,
+                                        struct AdbcError* error) {
+  return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
+                                    table_types, column_name, statement, error);
 }
 
-AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
-                                       const char* value, struct AdbcError* error) {
-  return SqliteConnectionSetOption(connection, key, value, error);
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+                                            const char* catalog, const char* db_schema,
+                                            const char* table_name,
+                                            struct ArrowSchema* schema,
+                                            struct AdbcError* error) {
+  return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
+                                        schema, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
+                                           struct AdbcStatement* statement,
+                                           struct AdbcError* error) {
+  return SqliteConnectionGetTableTypes(connection, statement, error);
 }
 
 AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
@@ -811,11 +1339,22 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
   return SqliteConnectionInit(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
+                                 struct AdbcConnection* connection,
+                                 struct AdbcError* error) {
+  return SqliteConnectionNew(database, connection, error);
+}
+
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error) {
   return SqliteConnectionRelease(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+                                       const char* value, struct AdbcError* error) {
+  return SqliteConnectionSetOption(connection, key, value, error);
+}
+
 AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
                                  struct ArrowArray* values, struct ArrowSchema* schema,
                                  struct AdbcError* error) {
@@ -889,6 +1428,9 @@ AdbcStatusCode AdbcSqliteDriverInit(size_t count, struct AdbcDriver* driver,
   driver->DatabaseRelease = SqliteDatabaseRelease;
   driver->DatabaseSetOption = SqliteDatabaseSetOption;
 
+  driver->ConnectionGetObjects = SqliteConnectionGetObjects;
+  driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
+  driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
   driver->ConnectionInit = SqliteConnectionInit;
   driver->ConnectionNew = SqliteConnectionNew;
   driver->ConnectionRelease = SqliteConnectionRelease;
diff --git a/drivers/sqlite/sqlite_test.cc b/drivers/sqlite/sqlite_test.cc
index b562f6a..dff4a6e 100644
--- a/drivers/sqlite/sqlite_test.cc
+++ b/drivers/sqlite/sqlite_test.cc
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <string>
+#include <vector>
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
@@ -32,6 +35,18 @@ namespace adbc {
 
 using arrow::PointeesEqual;
 
+using RecordBatchMatcher =
+    decltype(::testing::UnorderedPointwise(PointeesEqual(), arrow::RecordBatchVector{}));
+
+RecordBatchMatcher BatchesAre(const std::shared_ptr<arrow::Schema>& schema,
+                              const std::vector<std::string>& batch_json) {
+  arrow::RecordBatchVector batches;
+  for (const std::string& json : batch_json) {
+    batches.push_back(adbc::RecordBatchFromJSON(schema, json));
+  }
+  return ::testing::UnorderedPointwise(PointeesEqual(), std::move(batches));
+}
+
 class Sqlite : public ::testing::Test {
  public:
   void SetUp() override {
@@ -58,6 +73,54 @@ class Sqlite : public ::testing::Test {
   AdbcDatabase database;
   AdbcConnection connection;
   AdbcError error = {};
+
+  std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
+      arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
+      arrow::field("ordinal_position", arrow::int32()),
+      arrow::field("remarks", arrow::utf8()),
+      arrow::field("xdbc_data_type", arrow::int16()),
+      arrow::field("xdbc_type_name", arrow::utf8()),
+      arrow::field("xdbc_column_size", arrow::int32()),
+      arrow::field("xdbc_decimal_digits", arrow::int16()),
+      arrow::field("xdbc_num_prec_radix", arrow::int16()),
+      arrow::field("xdbc_nullable", arrow::int16()),
+      arrow::field("xdbc_column_def", arrow::utf8()),
+      arrow::field("xdbc_sql_data_type", arrow::int16()),
+      arrow::field("xdbc_datetime_sub", arrow::int16()),
+      arrow::field("xdbc_char_octet_length", arrow::int32()),
+      arrow::field("xdbc_is_nullable", arrow::utf8()),
+      arrow::field("xdbc_scope_catalog", arrow::utf8()),
+      arrow::field("xdbc_scope_schema", arrow::utf8()),
+      arrow::field("xdbc_scope_table", arrow::utf8()),
+      arrow::field("xdbc_is_autoincrement", arrow::boolean()),
+      arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
+  });
+  std::shared_ptr<arrow::DataType> usage_schema = arrow::struct_({
+      arrow::field("fk_catalog", arrow::utf8()),
+      arrow::field("fk_db_schema", arrow::utf8()),
+      arrow::field("fk_table", arrow::utf8()),
+      arrow::field("fk_column_name", arrow::utf8()),
+  });
+  std::shared_ptr<arrow::DataType> constraint_schema = arrow::struct_({
+      arrow::field("constraint_name", arrow::utf8()),
+      arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
+      arrow::field("column_names", arrow::list(arrow::utf8()), /*nullable=*/false),
+      arrow::field("column_names", arrow::list(usage_schema)),
+  });
+  std::shared_ptr<arrow::DataType> table_schema = arrow::struct_({
+      arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
+      arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+      arrow::field("table_columns", arrow::list(column_schema)),
+      arrow::field("table_constraints", arrow::list(constraint_schema)),
+  });
+  std::shared_ptr<arrow::DataType> db_schema_schema = arrow::struct_({
+      arrow::field("db_schema_name", arrow::utf8()),
+      arrow::field("db_schema_tables", arrow::list(table_schema)),
+  });
+  std::shared_ptr<arrow::Schema> catalog_schema = arrow::schema({
+      arrow::field("catalog_name", arrow::utf8()),
+      arrow::field("catalog_db_schemas", arrow::list(db_schema_schema)),
+  });
 };
 
 TEST_F(Sqlite, SqlExecute) {
@@ -256,9 +319,7 @@ TEST_F(Sqlite, MultipleConnections) {
     arrow::RecordBatchVector batches;
     ASSERT_NO_FATAL_FAILURE(ReadStatement(&statement, &schema, &batches));
     ASSERT_SCHEMA_EQ(*schema, *arrow::schema({}));
-    EXPECT_THAT(batches,
-                ::testing::UnorderedPointwise(
-                    PointeesEqual(), std::vector<std::shared_ptr<arrow::RecordBatch>>{}));
+    EXPECT_TRUE(batches.empty());
   }
 
   {
@@ -274,12 +335,334 @@ TEST_F(Sqlite, MultipleConnections) {
     arrow::RecordBatchVector batches;
     ReadStatement(&statement, &schema, &batches);
     ASSERT_SCHEMA_EQ(*schema, *arrow::schema({arrow::field("bar", arrow::null())}));
-    EXPECT_THAT(batches,
-                ::testing::UnorderedPointwise(
-                    PointeesEqual(), std::vector<std::shared_ptr<arrow::RecordBatch>>{}));
+    EXPECT_TRUE(batches.empty());
   }
 
   ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2, &error));
 }
 
+TEST_F(Sqlite, MetadataGetTableTypes) {
+  AdbcStatement statement;
+  std::memset(&statement, 0, sizeof(statement));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error,
+                            AdbcConnectionGetTableTypes(&connection, &statement, &error));
+
+  auto expected_schema = arrow::schema({
+      arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+  });
+  std::shared_ptr<arrow::Schema> schema;
+  arrow::RecordBatchVector batches;
+  ReadStatement(&statement, &schema, &batches);
+  ASSERT_SCHEMA_EQ(*schema, *expected_schema);
+  EXPECT_THAT(batches,
+              ::testing::UnorderedPointwise(
+                  PointeesEqual(), {
+                                       adbc::RecordBatchFromJSON(
+                                           expected_schema, R"([["table"], ["view"]])"),
+                                   }));
+}
+
+TEST_F(Sqlite, MetadataGetObjects) {
+  // Create a table via ingestion
+  {
+    ArrowArray export_table;
+    ArrowSchema export_schema;
+    auto bulk_schema = arrow::schema(
+        {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+    auto bulk_table =
+        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "bulk_insert", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+  }
+
+  // Query for catalogs
+  AdbcStatement statement;
+  std::memset(&statement, 0, sizeof(statement));
+  std::shared_ptr<arrow::Schema> schema;
+  arrow::RecordBatchVector batches;
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr,
+                               nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([[null, null]])"}));
+  batches.clear();
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, "catalog",
+                               nullptr, nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([])"}));
+  batches.clear();
+
+  // Query for schemas
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr,
+                               nullptr, nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(
+      batches,
+      BatchesAre(catalog_schema,
+                 {R"([[null, [{"db_schema_name": null, "db_schema_tables": null}]]])"}));
+  batches.clear();
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr,
+                               "schema", nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([[null, []]])"}));
+  batches.clear();
+
+  // Query for tables
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+                               nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {"table_name": "bulk_insert", "table_type": "table", "table_columns": null, "table_constraints": null}
+]}]]])"}));
+  batches.clear();
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+                               "bulk_%", nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {"table_name": "bulk_insert", "table_type": "table", "table_columns": null, "table_constraints": null}
+]}]]])"}));
+  batches.clear();
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+                               "asdf%", nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(
+      batches,
+      BatchesAre(catalog_schema,
+                 {R"([[null, [{"db_schema_name": null, "db_schema_tables": []}]]])"}));
+  batches.clear();
+
+  // Query for table types
+  std::vector<const char*> table_types(2);
+  table_types[0] = "table";
+  table_types[1] = nullptr;
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+                               nullptr, table_types.data(), nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {
+    "table_name": "bulk_insert",
+    "table_type": "table",
+    "table_columns": [
+      ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["strs", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": []
+  }
+]}]]])"}));
+  batches.clear();
+
+  table_types[0] = "view";
+  table_types[1] = nullptr;
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+                               nullptr, table_types.data(), nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(
+      batches,
+      BatchesAre(catalog_schema,
+                 {R"([[null, [{"db_schema_name": null, "db_schema_tables": []}]]])"}));
+  batches.clear();
+
+  // Query for columns
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+                               nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {
+    "table_name": "bulk_insert",
+    "table_type": "table",
+    "table_columns": [
+      ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["strs", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": []
+  }
+]}]]])"}));
+  batches.clear();
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+                               nullptr, nullptr, "in%", &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {
+    "table_name": "bulk_insert",
+    "table_type": "table",
+    "table_columns": [
+      ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": []
+  }
+]}]]])"}));
+  batches.clear();
+}
+
+TEST_F(Sqlite, MetadataGetObjectsColumns) {
+  {
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error,
+        AdbcStatementSetSqlQuery(
+            &statement, "CREATE TABLE parent (a, b, c, PRIMARY KEY(c, b))", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetSqlQuery(&statement, "CREATE TABLE other (a)", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetSqlQuery(
+                   &statement,
+                   "CREATE TABLE child (a, b, c, PRIMARY KEY(a), FOREIGN KEY (c, b) "
+                   "REFERENCES parent (c, b), FOREIGN KEY (a) REFERENCES other(a))",
+                   &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+  }
+
+  AdbcStatement statement;
+  std::memset(&statement, 0, sizeof(statement));
+  std::shared_ptr<arrow::Schema> schema;
+  arrow::RecordBatchVector batches;
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+                               nullptr, nullptr, nullptr, &statement, &error));
+  ReadStatement(&statement, &schema, &batches);
+  EXPECT_THAT(batches,
+              BatchesAre(catalog_schema,
+                         {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+  {
+    "table_name": "child",
+    "table_type": "table",
+    "table_columns": [
+      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": [
+      [null, "PRIMARY KEY", ["a"], []],
+      [null, "FOREIGN KEY", ["a"], [[null, null, "other", "a"]]],
+      [null, "FOREIGN KEY", ["c", "b"], [[null, null, "parent", "c"], [null, null, "parent", "b"]]]
+    ]
+  },
+  {
+    "table_name": "other",
+    "table_type": "table",
+    "table_columns": [
+      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": []
+  },
+  {
+    "table_name": "parent",
+    "table_type": "table",
+    "table_columns": [
+      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+      ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+    ],
+    "table_constraints": [
+      [null, "PRIMARY KEY", ["c", "b"], []]
+    ]
+  }
+]}]]])"}));
+  batches.clear();
+}
+
+TEST_F(Sqlite, MetadataGetTableSchema) {
+  // Create a table via ingestion
+  auto bulk_schema = arrow::schema(
+      {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+  {
+    ArrowArray export_table;
+    ArrowSchema export_schema;
+    auto bulk_table =
+        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "bulk_insert", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+  }
+
+  ArrowSchema export_schema;
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error, AdbcConnectionGetTableSchema(&connection, /*catalog=*/nullptr,
+                                          /*db_schema=*/nullptr, "bulk_insert",
+                                          &export_schema, &error));
+
+  ASSERT_OK_AND_ASSIGN(auto schema, arrow::ImportSchema(&export_schema));
+  ASSERT_SCHEMA_EQ(*schema, *bulk_schema);
+}
+
 }  // namespace adbc