You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/11/28 22:06:16 UTC

[arrow-adbc] branch main updated: feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b11e33  feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)
3b11e33 is described below

commit 3b11e33096de8c50f72762139c4258b9884de325
Author: David Li <li...@gmail.com>
AuthorDate: Mon Nov 28 17:06:10 2022 -0500

    feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)
    
    Also related: #144.
    
    Fixes #120.
    
    Co-authored-by: Dewey Dunnington <de...@fishandwhistle.net>
---
 c/driver/sqlite/CMakeLists.txt                     |   21 +-
 c/driver/sqlite/sqlite.c                           | 1788 +++++++++++++++++++
 c/driver/sqlite/sqlite.cc                          | 1827 --------------------
 c/driver/sqlite/sqlite_test.cc                     |  655 ++++---
 c/driver/sqlite/statement_reader.c                 |  896 ++++++++++
 c/driver/sqlite/statement_reader.h                 |   69 +
 c/driver/sqlite/types.h                            |   58 +
 c/driver/sqlite/utils.c                            |  146 ++
 c/driver/sqlite/utils.h                            |   88 +
 c/driver_manager/adbc_driver_manager.cc            |   11 +-
 c/driver_manager/adbc_driver_manager_test.cc       |   10 +-
 c/validation/adbc_validation.cc                    |  295 +---
 c/validation/adbc_validation_util.cc               |   89 +
 c/validation/adbc_validation_util.h                |  101 ++
 c/vendor/nanoarrow/nanoarrow.c                     |   16 +-
 c/vendor/nanoarrow/nanoarrow.h                     |    6 +-
 c/vendor/nanoarrow/nanoarrow.hpp                   |  308 ++++
 c/vendor/vendor_nanoarrow.sh                       |    1 +
 ci/conda_env_cpp.txt                               |    1 -
 glib/test/test-connection.rb                       |    2 +-
 glib/test/test-statement.rb                        |    2 +-
 go/adbc/drivermgr/wrapper.go                       |    2 +-
 go/adbc/drivermgr/wrapper_sqlite_test.go           |   18 +-
 .../adbc_driver_manager/dbapi.py                   |    6 +-
 python/adbc_driver_manager/tests/test_dbapi.py     |   15 +-
 python/adbc_driver_manager/tests/test_lowlevel.py  |    4 +-
 ruby/README.md                                     |    2 +-
 ruby/test/test-connection.rb                       |    2 +-
 28 files changed, 4128 insertions(+), 2311 deletions(-)

diff --git a/c/driver/sqlite/CMakeLists.txt b/c/driver/sqlite/CMakeLists.txt
index ce7b0d8..820d14e 100644
--- a/c/driver/sqlite/CMakeLists.txt
+++ b/c/driver/sqlite/CMakeLists.txt
@@ -15,39 +15,39 @@
 # specific language governing permissions and limitations
 # under the License.
 
-cmake_minimum_required(VERSION 3.14)
+cmake_minimum_required(VERSION 3.18)
 get_filename_component(REPOSITORY_ROOT "../../../" ABSOLUTE)
 list(APPEND CMAKE_MODULE_PATH "${REPOSITORY_ROOT}/c/cmake_modules/")
 include(AdbcDefines)
 include(BuildUtils)
 
-set(CMAKE_CXX_STANDARD 11)
-set(CMAKE_CXX_STANDARD_REQUIRED ON)
-
 project(adbc_driver_sqlite
         VERSION "${ADBC_BASE_VERSION}"
         LANGUAGES CXX)
 include(CTest)
+find_package(PkgConfig)
 
-find_package(Arrow REQUIRED)
 find_package(SQLite3 REQUIRED)
 
 add_arrow_lib(adbc_driver_sqlite
               SOURCES
-              sqlite.cc
+              sqlite.c
+              statement_reader.c
+              utils.c
               OUTPUTS
               ADBC_LIBRARIES
               SHARED_LINK_FLAGS
               ${ADBC_LINK_FLAGS}
               SHARED_LINK_LIBS
               SQLite::SQLite3
-              arrow_shared
+              nanoarrow
               STATIC_LINK_LIBS
               SQLite::SQLite3
-              arrow_static)
+              nanoarrow
+              ${LIBPQ_STATIC_LIBRARIES})
 include_directories(SYSTEM ${REPOSITORY_ROOT})
 include_directories(SYSTEM ${REPOSITORY_ROOT}/c/)
-include_directories(SYSTEM ${SQLite3_INCLUDE_DIRS})
+include_directories(SYSTEM ${REPOSITORY_ROOT}/c/vendor/nanoarrow/)
 foreach(LIB_TARGET ${ADBC_LIBRARIES})
   target_compile_definitions(${LIB_TARGET} PRIVATE ADBC_EXPORTING)
 endforeach()
@@ -70,8 +70,7 @@ if(ADBC_BUILD_TESTS)
                 nanoarrow
                 ${TEST_LINK_LIBS})
   target_compile_features(adbc-driver-sqlite-test PRIVATE cxx_std_17)
-  target_include_directories(adbc-driver-sqlite-test SYSTEM
-                             PRIVATE ${REPOSITORY_ROOT}/c/vendor/nanoarrow/)
+  adbc_configure_target(adbc-driver-sqlite-test)
 endif()
 
 validate_config()
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
new file mode 100644
index 0000000..233e5c1
--- /dev/null
+++ b/c/driver/sqlite/sqlite.c
@@ -0,0 +1,1788 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "adbc.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sqlite3.h>
+
+#include "nanoarrow.h"
+#include "statement_reader.h"
+#include "types.h"
+#include "utils.h"
+
+static const char kDefaultUri[] = "file:adbc_driver_sqlite?mode=memory&cache=shared";
+// The batch size for query results (and for initial type inference)
+static const char kStatementOptionBatchRows[] = "adbc.sqlite.query.batch_rows";
+static const uint32_t kSupportedInfoCodes[] = {
+    ADBC_INFO_VENDOR_NAME,    ADBC_INFO_VENDOR_VERSION,       ADBC_INFO_DRIVER_NAME,
+    ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
+};
+
+// Private names (to avoid conflicts when using the driver manager)
+
+#define CHECK_DB_INIT(NAME, ERROR)                             \
+  if (!NAME->private_data) {                                   \
+    SetError(ERROR, "%s: database not initialized", __func__); \
+    return ADBC_STATUS_INVALID_STATE;                          \
+  }
+#define CHECK_CONN_INIT(NAME, ERROR)                             \
+  if (!NAME->private_data) {                                     \
+    SetError(ERROR, "%s: connection not initialized", __func__); \
+    return ADBC_STATUS_INVALID_STATE;                            \
+  }
+#define CHECK_STMT_INIT(NAME, ERROR)                            \
+  if (!NAME->private_data) {                                    \
+    SetError(ERROR, "%s: statement not initialized", __func__); \
+    return ADBC_STATUS_INVALID_STATE;                           \
+  }
+
+AdbcStatusCode SqliteDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
+  if (database->private_data) {
+    SetError(error, "AdbcDatabaseNew: database already allocated");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  database->private_data = malloc(sizeof(struct SqliteDatabase));
+  memset(database->private_data, 0, sizeof(struct SqliteDatabase));
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char* key,
+                                       const char* value, struct AdbcError* error) {
+  CHECK_DB_INIT(database, error);
+  struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+  if (strcmp(key, "uri") == 0) {
+    if (db->uri) free(db->uri);
+    size_t len = strlen(value) + 1;
+    db->uri = malloc(len);
+    strncpy(db->uri, value, len);
+    return ADBC_STATUS_OK;
+  }
+  SetError(error, "Unknown database option %s=%s", key, value);
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+int OpenDatabase(const char* maybe_uri, sqlite3** db, struct AdbcError* error) {
+  const char* uri = maybe_uri ? maybe_uri : kDefaultUri;
+  int rc = sqlite3_open_v2(uri, db,
+                           SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+                           /*zVfs=*/NULL);
+  if (rc != SQLITE_OK) {
+    if (*db) {
+      SetError(error, "Failed to open %s: %s", uri, sqlite3_errmsg(*db));
+    } else {
+      SetError(error, "Failed to open %s: failed to allocate memory", uri);
+    }
+    (void)sqlite3_close(*db);
+    *db = NULL;
+    return ADBC_STATUS_IO;
+  }
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode ExecuteQuery(struct SqliteConnection* conn, const char* query,
+                            struct AdbcError* error) {
+  sqlite3_stmt* stmt = NULL;
+  int rc = sqlite3_prepare_v2(conn->conn, query, strlen(query), &stmt, /*pzTail=*/NULL);
+  while (rc != SQLITE_DONE && rc != SQLITE_ERROR) {
+    rc = sqlite3_step(stmt);
+  }
+  rc = sqlite3_finalize(stmt);
+  if (rc != SQLITE_OK && rc != SQLITE_DONE) {
+    SetError(error, "Failed to execute query \"%s\": %s", query,
+             sqlite3_errmsg(conn->conn));
+    return ADBC_STATUS_INTERNAL;
+  }
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
+                                  struct AdbcError* error) {
+  CHECK_DB_INIT(database, error);
+  struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+  if (db->db) {
+    SetError(error, "AdbcDatabaseInit: database already initialized");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  return OpenDatabase(db->uri, &db->db, error);
+}
+
+AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
+                                     struct AdbcError* error) {
+  CHECK_DB_INIT(database, error);
+  struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+  size_t connection_count = db->connection_count;
+  if (db->uri) free(db->uri);
+  if (db->db) {
+    if (sqlite3_close(db->db) == SQLITE_BUSY) {
+      SetError(error, "AdbcDatabaseRelease: connection is busy");
+      return ADBC_STATUS_IO;
+    }
+  }
+  free(database->private_data);
+  database->private_data = NULL;
+
+  if (connection_count > 0) {
+    SetError(error, "AdbcDatabaseRelease: %ld open connections when released",
+             connection_count);
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionNew(struct AdbcConnection* connection,
+                                   struct AdbcError* error) {
+  if (connection->private_data) {
+    SetError(error, "AdbcConnectionNew: connection already allocated");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  connection->private_data = malloc(sizeof(struct SqliteConnection));
+  memset(connection->private_data, 0, sizeof(struct SqliteConnection));
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
+                                         const char* key, const char* value,
+                                         struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  if (strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
+    if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+      if (conn->active_transaction) {
+        AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
+        if (status != ADBC_STATUS_OK) return status;
+        conn->active_transaction = 0;
+      } else {
+        // no-op
+      }
+    } else if (strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+      if (conn->active_transaction) {
+        // no-op
+      } else {
+        // begin
+        AdbcStatusCode status = ExecuteQuery(conn, "BEGIN", error);
+        if (status != ADBC_STATUS_OK) return status;
+        conn->active_transaction = 1;
+      }
+    } else {
+      SetError(error, "Invalid connection option value %s=%s", key, value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+    return ADBC_STATUS_OK;
+  }
+  SetError(error, "Unknown connection option %s=%s", key, value);
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
+                                    struct AdbcDatabase* database,
+                                    struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  CHECK_DB_INIT(database, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+  if (conn->conn) {
+    SetError(error, "AdbcConnectionInit: connection already initialized");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  return OpenDatabase(db->uri, &conn->conn, error);
+}
+
+AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
+                                       struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+  if (conn->conn) {
+    int rc = sqlite3_close(conn->conn);
+    if (rc == SQLITE_BUSY) {
+      SetError(error, "AdbcConnectionRelease: connection is busy");
+      return ADBC_STATUS_IO;
+    }
+  }
+  free(connection->private_data);
+  connection->private_data = NULL;
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetInfoAppendStringImpl(struct ArrowArray* array,
+                                                       uint32_t info_code,
+                                                       const char* info_value,
+                                                       struct AdbcError* error) {
+  CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
+  // Append to type variant
+  struct ArrowStringView value = ArrowCharView(info_value);
+  CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[1]->children[0], value),
+           error);
+  // Append type code (by hand)
+  struct ArrowBuffer* codes = ArrowArrayBuffer(array->children[1], 0);
+  CHECK_NA(INTERNAL, ArrowBufferAppendInt8(codes, 0), error);
+  // Append type offset (by hand)
+  struct ArrowBuffer* offsets = ArrowArrayBuffer(array->children[1], 1);
+  CHECK_NA(INTERNAL,
+           ArrowBufferAppendInt32(offsets, array->children[1]->children[0]->length - 1),
+           error);
+  array->children[1]->length++;
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetInfoImpl(const uint32_t* info_codes,
+                                           size_t info_codes_length,
+                                           struct ArrowSchema* schema,
+                                           struct ArrowArray* array,
+                                           struct AdbcError* error) {
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/2), error);
+
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_UINT32), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "info_name"), error);
+  schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+
+  struct ArrowSchema* info_value = schema->children[1];
+  // TODO(apache/arrow-nanoarrow#73): formal union support
+  // initialize with dummy then override
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value, NANOARROW_TYPE_UINT32), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetFormat(info_value, "+ud:0,1,2,3,4,5"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value, "info_value"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(info_value, /*num_columns=*/6), error);
+
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[0], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[0], "string_value"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[1], NANOARROW_TYPE_BOOL),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[1], "bool_value"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[2], NANOARROW_TYPE_INT64),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[2], "int64_value"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[3], NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[3], "int32_bitmask"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[4], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[4], "string_list"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[5], NANOARROW_TYPE_MAP), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(info_value->children[5], "int32_to_int32_list_map"), error);
+
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(info_value->children[4], /*num_columns=*/1),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(info_value->children[4]->children[0], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[4]->children[0], "item"),
+           error);
+
+  // XXX: nanoarrow could possibly use helpers for nested types like this
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(info_value->children[5], /*num_columns=*/1),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(info_value->children[5]->children[0], NANOARROW_TYPE_STRUCT),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[5]->children[0], "entries"),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(info_value->children[5]->children[0],
+                                       /*num_columns=*/2),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(info_value->children[5]->children[0]->children[0],
+                           NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(info_value->children[5]->children[0]->children[0], "key"),
+           error);
+  info_value->children[5]->children[0]->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(info_value->children[5]->children[0]->children[1],
+                           NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(info_value->children[5]->children[0]->children[1], "item"),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(info_value->children[5]->children[0]->children[1],
+                                       /*num_columns=*/1),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(info_value->children[5]->children[0]->children[1]->children[0],
+                           NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(
+               info_value->children[5]->children[0]->children[1]->children[0], "item"),
+           error);
+
+  struct ArrowError na_error = {0};
+  CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, &na_error), &na_error,
+                  error);
+  CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+  for (size_t i = 0; i < info_codes_length; i++) {
+    switch (info_codes[i]) {
+      case ADBC_INFO_VENDOR_NAME:
+        RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i], "SQLite",
+                                                           error));
+        break;
+      case ADBC_INFO_VENDOR_VERSION:
+        RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+                                                           sqlite3_libversion(), error));
+        break;
+      case ADBC_INFO_DRIVER_NAME:
+        RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+                                                           "ADBC SQLite Driver", error));
+        break;
+      case ADBC_INFO_DRIVER_VERSION:
+        // TODO(lidavidm): fill in driver version
+        RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+                                                           "(unknown)", error));
+        break;
+      case ADBC_INFO_DRIVER_ARROW_VERSION:
+        RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+                                                           NANOARROW_BUILD_ID, error));
+        break;
+      default:
+        // Ignore
+        continue;
+    }
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+  }
+
+  CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuilding(array, &na_error), &na_error, error);
+
+  return ADBC_STATUS_OK;
+}  // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteConnectionGetInfo(struct AdbcConnection* connection,
+                                       uint32_t* info_codes, size_t info_codes_length,
+                                       struct ArrowArrayStream* out,
+                                       struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+
+  // XXX: mistake in adbc.h (should have been const pointer)
+  const uint32_t* codes = info_codes;
+  if (!info_codes) {
+    codes = kSupportedInfoCodes;
+    info_codes_length = sizeof(kSupportedInfoCodes) / sizeof(kSupportedInfoCodes[0]);
+  }
+
+  struct ArrowSchema schema = {0};
+  struct ArrowArray array = {0};
+
+  AdbcStatusCode status =
+      SqliteConnectionGetInfoImpl(codes, info_codes_length, &schema, &array, error);
+  if (status != ADBC_STATUS_OK) {
+    if (schema.release) schema.release(&schema);
+    if (array.release) array.release(&array);
+    return status;
+  }
+
+  return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionGetObjectsSchema(struct ArrowSchema* schema,
+                                                struct AdbcError* error) {
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/2), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_STRING), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "catalog_name"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[1], NANOARROW_TYPE_LIST), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[1], "catalog_db_schemas"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema->children[1], 1), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(schema->children[1]->children[0], NANOARROW_TYPE_STRUCT),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[1]->children[0], "item"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema->children[1]->children[0], 2),
+           error);
+
+  struct ArrowSchema* db_schema_schema = schema->children[1]->children[0];
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(db_schema_schema->children[0], NANOARROW_TYPE_STRING), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(db_schema_schema->children[0], "db_schema_name"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(db_schema_schema->children[1], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(db_schema_schema->children[1], "db_schema_tables"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(db_schema_schema->children[1], 1),
+           error);
+  CHECK_NA(
+      INTERNAL,
+      ArrowSchemaInit(db_schema_schema->children[1]->children[0], NANOARROW_TYPE_STRUCT),
+      error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(db_schema_schema->children[1]->children[0], "item"), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(db_schema_schema->children[1]->children[0], 4),
+           error);
+
+  struct ArrowSchema* table_schema = db_schema_schema->children[1]->children[0];
+  CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[0], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[0], "table_name"), error);
+  table_schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[1], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[1], "table_type"), error);
+  table_schema->children[1]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[2], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[2], "table_columns"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(table_schema->children[2], 1), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(table_schema->children[2]->children[0], NANOARROW_TYPE_STRUCT),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[2]->children[0], "item"),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(table_schema->children[2]->children[0], 19),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[3], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[3], "table_constraints"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(table_schema->children[3], 1), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(table_schema->children[3]->children[0], NANOARROW_TYPE_STRUCT),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[3]->children[0], "item"),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(table_schema->children[3]->children[0], 4), error);
+
+  struct ArrowSchema* column_schema = table_schema->children[2]->children[0];
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[0], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[0], "column_name"),
+           error);
+  column_schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[1], NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[1], "ordinal_position"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[2], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[2], "remarks"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[3], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[3], "xdbc_data_type"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[4], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[4], "xdbc_type_name"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[5], NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[5], "xdbc_column_size"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[6], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[6], "xdbc_decimal_digits"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[7], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[7], "xdbc_num_prec_radix"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[8], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[8], "xdbc_nullable"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[9], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[9], "xdbc_column_def"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[10], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[10], "xdbc_sql_data_type"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[11], NANOARROW_TYPE_INT16),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[11], "xdbc_datetime_sub"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[12], NANOARROW_TYPE_INT32),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[12], "xdbc_char_octet_length"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[13], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[13], "xdbc_is_nullable"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[14], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[14], "xdbc_scope_catalog"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[15], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[15], "xdbc_scope_schema"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[16], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[16], "xdbc_scope_table"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[17], NANOARROW_TYPE_BOOL),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[17], "xdbc_is_autoincrement"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[18], NANOARROW_TYPE_BOOL),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(column_schema->children[18], "xdbc_is_generatedcolumn"),
+           error);
+
+  struct ArrowSchema* constraint_schema = table_schema->children[3]->children[0];
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(constraint_schema->children[0], NANOARROW_TYPE_STRING), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[0], "constraint_name"), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaInit(constraint_schema->children[1], NANOARROW_TYPE_STRING), error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[1], "constraint_type"), error);
+  constraint_schema->children[1]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(constraint_schema->children[2], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[2], "constraint_column_names"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(constraint_schema->children[2], 1),
+           error);
+  CHECK_NA(
+      INTERNAL,
+      ArrowSchemaInit(constraint_schema->children[2]->children[0], NANOARROW_TYPE_STRING),
+      error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[2]->children[0], "item"),
+           error);
+  constraint_schema->children[2]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(constraint_schema->children[3], NANOARROW_TYPE_LIST),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[3], "constraint_column_usage"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(constraint_schema->children[3], 1),
+           error);
+  CHECK_NA(
+      INTERNAL,
+      ArrowSchemaInit(constraint_schema->children[3]->children[0], NANOARROW_TYPE_STRUCT),
+      error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaSetName(constraint_schema->children[3]->children[0], "item"),
+           error);
+  CHECK_NA(INTERNAL,
+           ArrowSchemaAllocateChildren(constraint_schema->children[3]->children[0], 4),
+           error);
+
+  struct ArrowSchema* usage_schema = constraint_schema->children[3]->children[0];
+  CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[0], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[0], "fk_catalog"), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[1], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[1], "fk_db_schema"),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[2], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[2], "fk_table"), error);
+  usage_schema->children[2]->flags &= ~ARROW_FLAG_NULLABLE;
+  CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[3], NANOARROW_TYPE_STRING),
+           error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[3], "fk_column_name"),
+           error);
+  usage_schema->children[3]->flags &= ~ARROW_FLAG_NULLABLE;
+
+  return ADBC_STATUS_OK;
+}
+
+static const char kTableQuery[] =
+    "SELECT name, type "
+    "FROM sqlite_master "
+    "WHERE name LIKE ? AND type <> 'index'"
+    "ORDER BY name ASC";
+static const char kColumnQuery[] =
+    "SELECT cid, name, type, \"notnull\", dflt_value "
+    "FROM pragma_table_info(?) "
+    "WHERE name LIKE ? "
+    "ORDER BY cid ASC";
+static const char kPrimaryKeyQuery[] =
+    "SELECT name "
+    "FROM pragma_table_info(?) "
+    "WHERE pk > 0 "
+    "ORDER BY pk ASC";
+static const char kForeignKeyQuery[] =
+    "SELECT id, seq, \"table\", \"from\", \"to\" "
+    "FROM pragma_foreign_key_list(?) "
+    "ORDER BY id, seq ASC";
+
+AdbcStatusCode SqliteConnectionGetColumnsImpl(
+    struct SqliteConnection* conn, const char* table_name, const char* column_name,
+    struct ArrowArray* table_columns_col, sqlite3_stmt* stmt, struct AdbcError* error) {
+  struct ArrowArray* table_columns_items = table_columns_col->children[0];
+  struct ArrowArray* column_name_col = table_columns_items->children[0];
+  struct ArrowArray* ordinal_position_col = table_columns_items->children[1];
+  struct ArrowArray* remarks_col = table_columns_items->children[2];
+  struct ArrowArray* xdbc_data_type_col = table_columns_items->children[3];
+  struct ArrowArray* xdbc_type_name_col = table_columns_items->children[4];
+  struct ArrowArray* xdbc_column_size_col = table_columns_items->children[5];
+  struct ArrowArray* xdbc_decimal_digits_col = table_columns_items->children[6];
+  struct ArrowArray* xdbc_num_prec_radix_col = table_columns_items->children[7];
+  struct ArrowArray* xdbc_nullable_col = table_columns_items->children[8];
+  struct ArrowArray* xdbc_column_def_col = table_columns_items->children[9];
+  struct ArrowArray* xdbc_sql_data_type_col = table_columns_items->children[10];
+  struct ArrowArray* xdbc_datetime_sub_col = table_columns_items->children[11];
+  struct ArrowArray* xdbc_char_octet_length_col = table_columns_items->children[12];
+  struct ArrowArray* xdbc_is_nullable_col = table_columns_items->children[13];
+  struct ArrowArray* xdbc_scope_catalog_col = table_columns_items->children[14];
+  struct ArrowArray* xdbc_scope_schema_col = table_columns_items->children[15];
+  struct ArrowArray* xdbc_scope_table_col = table_columns_items->children[16];
+  struct ArrowArray* xdbc_is_autoincrement_col = table_columns_items->children[17];
+  struct ArrowArray* xdbc_is_generatedcolumn_col = table_columns_items->children[18];
+
+  int rc = sqlite3_reset(stmt);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  rc = sqlite3_bind_text64(stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+                           SQLITE_UTF8);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  if (column_name) {
+    rc = sqlite3_bind_text64(stmt, 2, column_name, strlen(column_name), SQLITE_STATIC,
+                             SQLITE_UTF8);
+  } else {
+    rc = sqlite3_bind_text64(stmt, 2, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
+  }
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+    const char* col_name = (const char*)sqlite3_column_text(stmt, 1);
+    struct ArrowStringView str = {.data = col_name,
+                                  .n_bytes = sqlite3_column_bytes(stmt, 1)};
+    CHECK_NA(INTERNAL, ArrowArrayAppendString(column_name_col, str), error);
+
+    const int32_t col_cid = sqlite3_column_int(stmt, 0);
+    CHECK_NA(INTERNAL, ArrowArrayAppendInt(ordinal_position_col, col_cid + 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(remarks_col, 1), error);
+
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_data_type_col, 1), error);
+
+    const char* col_type = (const char*)sqlite3_column_text(stmt, 2);
+    if (col_type) {
+      str.data = col_type;
+      str.n_bytes = sqlite3_column_bytes(stmt, 2);
+      CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_type_name_col, str), error);
+    } else {
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_type_name_col, 1), error);
+    }
+
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_size_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_decimal_digits_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_num_prec_radix_col, 1), error);
+
+    const int32_t col_notnull = sqlite3_column_int(stmt, 3);
+    if (col_notnull == 0) {
+      // JDBC columnNullable == 1
+      CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 1), error);
+    } else {
+      // JDBC columnNoNulls == 0
+      CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 0), error);
+    }
+
+    const char* col_def = (const char*)sqlite3_column_text(stmt, 4);
+    if (col_def) {
+      str.data = col_def;
+      str.n_bytes = sqlite3_column_bytes(stmt, 4);
+      CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_column_def_col, str), error);
+    } else {
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_def_col, 1), error);
+    }
+
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_sql_data_type_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_datetime_sub_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_char_octet_length_col, 1), error);
+
+    if (col_notnull == 0) {
+      str.data = "YES";
+      str.n_bytes = 3;
+    } else {
+      str.data = "NO";
+      str.n_bytes = 2;
+    }
+    CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_is_nullable_col, str), error);
+
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_catalog_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_schema_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_table_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_autoincrement_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_generatedcolumn_col, 1), error);
+
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_items), error);
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetConstraintsImpl(
+    struct SqliteConnection* conn, const char* table_name, const char* column_name,
+    struct ArrowArray* table_constraints_col, sqlite3_stmt* pk_stmt,
+    sqlite3_stmt* fk_stmt, struct AdbcError* error) {
+  struct ArrowArray* table_constraints_items = table_constraints_col->children[0];
+  struct ArrowArray* constraint_name_col = table_constraints_items->children[0];
+  struct ArrowArray* constraint_type_col = table_constraints_items->children[1];
+  struct ArrowArray* constraint_column_names_col = table_constraints_items->children[2];
+  struct ArrowArray* constraint_column_names_items =
+      constraint_column_names_col->children[0];
+  struct ArrowArray* constraint_column_usage_col = table_constraints_items->children[3];
+  struct ArrowArray* constraint_column_usage_items =
+      constraint_column_usage_col->children[0];
+  struct ArrowArray* fk_catalog_col = constraint_column_usage_items->children[0];
+  struct ArrowArray* fk_db_schema_col = constraint_column_usage_items->children[1];
+  struct ArrowArray* fk_table_col = constraint_column_usage_items->children[2];
+  struct ArrowArray* fk_column_name_col = constraint_column_usage_items->children[3];
+
+  // We can get primary keys and foreign keys, but not unique
+  // constraints (unless we parse the SQL table definition)
+
+  int rc = sqlite3_reset(pk_stmt);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  rc = sqlite3_bind_text64(pk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+                           SQLITE_UTF8);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  char has_primary_key = 0;
+  while ((rc = sqlite3_step(pk_stmt)) == SQLITE_ROW) {
+    if (!has_primary_key) {
+      has_primary_key = 1;
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
+      CHECK_NA(INTERNAL,
+               ArrowArrayAppendString(constraint_name_col, ArrowCharView("PRIMARY KEY")),
+               error);
+    }
+    CHECK_NA(
+        INTERNAL,
+        ArrowArrayAppendString(
+            constraint_column_names_items,
+            (struct ArrowStringView){.data = (const char*)sqlite3_column_text(pk_stmt, 0),
+                                     .n_bytes = sqlite3_column_bytes(pk_stmt, 0)}),
+        error);
+  }
+  if (has_primary_key) {
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+    CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_column_usage_col, 1), error);
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+  }
+
+  rc = sqlite3_reset(fk_stmt);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  rc = sqlite3_bind_text64(fk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+                           SQLITE_UTF8);
+  RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+  int prev_fk_id = -1;
+  while ((rc = sqlite3_step(fk_stmt)) == SQLITE_ROW) {
+    const int fk_id = sqlite3_column_int(fk_stmt, 0);
+    const int fk_seq = sqlite3_column_int(fk_stmt, 1);
+    const char* to_table = (const char*)sqlite3_column_text(fk_stmt, 2);
+    const char* from_col = (const char*)sqlite3_column_text(fk_stmt, 3);
+    const char* to_col = (const char*)sqlite3_column_text(fk_stmt, 4);
+
+    if (fk_id != prev_fk_id) {
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
+      CHECK_NA(INTERNAL,
+               ArrowArrayAppendString(constraint_name_col, ArrowCharView("FOREIGN KEY")),
+               error);
+
+      if (prev_fk_id != -1) {
+        CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+        CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
+        CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+      }
+      prev_fk_id = fk_id;
+
+      CHECK_NA(INTERNAL,
+               ArrowArrayAppendString(
+                   constraint_column_names_items,
+                   (struct ArrowStringView){.data = from_col,
+                                            .n_bytes = sqlite3_column_bytes(pk_stmt, 3)}),
+               error);
+      CHECK_NA(INTERNAL, ArrowArrayAppendString(fk_catalog_col, ArrowCharView("main")),
+               error);
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(fk_db_schema_col, 1), error);
+      CHECK_NA(INTERNAL,
+               ArrowArrayAppendString(
+                   fk_table_col,
+                   (struct ArrowStringView){.data = to_table,
+                                            .n_bytes = sqlite3_column_bytes(pk_stmt, 2)}),
+               error);
+      CHECK_NA(INTERNAL,
+               ArrowArrayAppendString(
+                   fk_column_name_col,
+                   (struct ArrowStringView){.data = to_col,
+                                            .n_bytes = sqlite3_column_bytes(pk_stmt, 4)}),
+               error);
+    }
+  }
+  if (prev_fk_id != -1) {
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+  }
+
+  return ADBC_STATUS_OK;
+}  // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteConnectionGetTablesInner(
+    struct SqliteConnection* conn, sqlite3_stmt* tables_stmt, sqlite3_stmt* columns_stmt,
+    sqlite3_stmt* pk_stmt, sqlite3_stmt* fk_stmt, const char** table_type,
+    const char* column_name, struct ArrowArray* db_schema_tables_col,
+    struct AdbcError* error) {
+  struct ArrowArray* db_schema_tables_items = db_schema_tables_col->children[0];
+  struct ArrowArray* table_name_col = db_schema_tables_items->children[0];
+  struct ArrowArray* table_type_col = db_schema_tables_items->children[1];
+  struct ArrowArray* table_columns_col = db_schema_tables_items->children[2];
+  struct ArrowArray* table_constraints_col = db_schema_tables_items->children[3];
+
+  int rc = SQLITE_OK;
+  while ((rc = sqlite3_step(tables_stmt)) == SQLITE_ROW) {
+    const char* cur_table_type = (const char*)sqlite3_column_text(tables_stmt, 1);
+
+    if (table_type) {
+      const char** current = table_type;
+      char found = 0;
+      while (*current) {
+        if (strcmp(*current, cur_table_type) == 0) {
+          found = 1;
+          break;
+        }
+        current++;
+      }
+      if (!found) continue;
+    }
+
+    struct ArrowStringView str = {.data = cur_table_type,
+                                  .n_bytes = sqlite3_column_bytes(tables_stmt, 1)};
+    CHECK_NA(INTERNAL, ArrowArrayAppendString(table_type_col, str), error);
+
+    const char* cur_table = (const char*)sqlite3_column_text(tables_stmt, 0);
+    str.data = cur_table;
+    str.n_bytes = sqlite3_column_bytes(tables_stmt, 0);
+    CHECK_NA(INTERNAL, ArrowArrayAppendString(table_name_col, str), error);
+
+    if (columns_stmt == NULL) {
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_columns_col, 1), error);
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_constraints_col, 1), error);
+    } else {
+      // XXX: n + 1 query pattern. You can join on a pragma so we
+      // could avoid this in principle but it complicates the
+      // unpacking code here quite a bit, so ignore for now.
+      RAISE_ADBC(SqliteConnectionGetColumnsImpl(conn, cur_table, column_name,
+                                                table_columns_col, columns_stmt, error));
+      // Not strictly necessary, but we passed SQLITE_STATIC when
+      // binding so don't let the reference leak
+      (void)sqlite3_clear_bindings(columns_stmt);
+      CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_col), error);
+
+      RAISE_ADBC(SqliteConnectionGetConstraintsImpl(
+          conn, cur_table, column_name, table_constraints_col, pk_stmt, fk_stmt, error));
+      (void)sqlite3_clear_bindings(pk_stmt);
+      (void)sqlite3_clear_bindings(fk_stmt);
+      CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_col), error);
+    }
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_items), error);
+  }
+
+  if (rc != SQLITE_DONE) {
+    SetError(error, "Failed to query for tables: %s", sqlite3_errmsg(conn->conn));
+    return ADBC_STATUS_INTERNAL;
+  }
+  CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_col), error);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetTablesImpl(struct SqliteConnection* conn, int depth,
+                                             const char* table_name,
+                                             const char** table_type,
+                                             const char* column_name,
+                                             struct ArrowArray* db_schema_tables_col,
+                                             struct AdbcError* error) {
+  sqlite3_stmt* tables_stmt = NULL;
+  sqlite3_stmt* columns_stmt = NULL;
+  sqlite3_stmt* pk_stmt = NULL;
+  sqlite3_stmt* fk_stmt = NULL;
+  int rc = SQLITE_OK;
+
+  if (rc == SQLITE_OK) {
+    rc = sqlite3_prepare_v2(conn->conn, kTableQuery, sizeof(kTableQuery), &tables_stmt,
+                            /*pzTail=*/NULL);
+  }
+  if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+    rc = sqlite3_prepare_v2(conn->conn, kColumnQuery, sizeof(kColumnQuery), &columns_stmt,
+                            /*pzTail=*/NULL);
+  }
+  if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+    rc = sqlite3_prepare_v2(conn->conn, kPrimaryKeyQuery, sizeof(kPrimaryKeyQuery),
+                            &pk_stmt, /*pzTail=*/NULL);
+  }
+  if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+    rc = sqlite3_prepare_v2(conn->conn, kForeignKeyQuery, sizeof(kForeignKeyQuery),
+                            &fk_stmt, /*pzTail=*/NULL);
+  }
+  if (rc == SQLITE_OK) {
+    if (table_name) {
+      rc = sqlite3_bind_text64(tables_stmt, 1, table_name, strlen(table_name),
+                               SQLITE_STATIC, SQLITE_UTF8);
+    } else {
+      rc = sqlite3_bind_text64(tables_stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
+    }
+  }
+
+  AdbcStatusCode status = ADBC_STATUS_OK;
+  if (rc == SQLITE_OK) {
+    status = SqliteConnectionGetTablesInner(conn, tables_stmt, columns_stmt, pk_stmt,
+                                            fk_stmt, table_type, column_name,
+                                            db_schema_tables_col, error);
+  } else {
+    SetError(error, "Failed to query for tables: %s", sqlite3_errmsg(conn->conn));
+    status = ADBC_STATUS_INTERNAL;
+  }
+
+  sqlite3_finalize(tables_stmt);
+  sqlite3_finalize(columns_stmt);
+  sqlite3_finalize(pk_stmt);
+  sqlite3_finalize(fk_stmt);
+  return status;
+}
+
+AdbcStatusCode SqliteConnectionGetObjectsImpl(
+    struct SqliteConnection* conn, int depth, const char* catalog, const char* db_schema,
+    const char* table_name, const char** table_type, const char* column_name,
+    struct ArrowSchema* schema, struct ArrowArray* array, struct AdbcError* error) {
+  RAISE_ADBC(SqliteConnectionGetObjectsSchema(schema, error));
+
+  struct ArrowError na_error = {0};
+  CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, &na_error), &na_error,
+                  error);
+  CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+  struct ArrowArray* catalog_name_col = array->children[0];
+  struct ArrowArray* catalog_db_schemas_col = array->children[1];
+
+  struct ArrowArray* catalog_db_schemas_items = catalog_db_schemas_col->children[0];
+  struct ArrowArray* db_schema_name_col = catalog_db_schemas_items->children[0];
+  struct ArrowArray* db_schema_tables_col = catalog_db_schemas_items->children[1];
+
+  // TODO: support proper filters
+  if (!catalog || strcmp(catalog, "main") == 0) {
+    // Default the primary catalog to "main"
+    // https://www.sqlite.org/cli.html
+    // > The ".databases" command shows a list of all databases open
+    // > in the current connection. There will always be at least
+    // > 2. The first one is "main", the original database opened.
+
+    CHECK_NA(INTERNAL, ArrowArrayAppendString(catalog_name_col, ArrowCharView("main")),
+             error);
+
+    if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col, 1), error);
+    } else if (!db_schema || db_schema == NULL) {
+      // For our purposes, we'll consider SQLite to always have a
+      // single, unnamed schema within each catalog.
+      CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_name_col, 1), error);
+      if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
+        CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_col, 1), error);
+      } else {
+        RAISE_ADBC(SqliteConnectionGetTablesImpl(conn, depth, table_name, table_type,
+                                                 column_name, db_schema_tables_col,
+                                                 error));
+      }
+      CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_items), error);
+      CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
+    } else {
+      CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
+    }
+    CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+
+    // TODO: implement "temp", other attached databases as catalogs
+  }
+
+  CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuilding(array, &na_error), &na_error, error);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetObjects(struct AdbcConnection* connection, int depth,
+                                          const char* catalog, const char* db_schema,
+                                          const char* table_name, const char** table_type,
+                                          const char* column_name,
+                                          struct ArrowArrayStream* out,
+                                          struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+  struct ArrowSchema schema = {0};
+  struct ArrowArray array = {0};
+
+  AdbcStatusCode status =
+      SqliteConnectionGetObjectsImpl(conn, depth, catalog, db_schema, table_name,
+                                     table_type, column_name, &schema, &array, error);
+  if (status != ADBC_STATUS_OK) {
+    if (schema.release) schema.release(&schema);
+    if (array.release) array.release(&array);
+    return status;
+  }
+
+  return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
+                                              const char* catalog, const char* db_schema,
+                                              const char* table_name,
+                                              struct ArrowSchema* schema,
+                                              struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  if (catalog != NULL && strlen(catalog) > 0) {
+    // TODO: map 'catalog' to SQLite attached database
+    memset(schema, 0, sizeof(*schema));
+    return ADBC_STATUS_OK;
+  } else if (db_schema != NULL && strlen(db_schema) > 0) {
+    // SQLite does not support schemas
+    memset(schema, 0, sizeof(*schema));
+    return ADBC_STATUS_OK;
+  } else if (table_name == NULL) {
+    SetError(error, "AdbcConnectionGetTableSchema: must provide table_name");
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  struct StringBuilder query = {0};
+  StringBuilderInit(&query, /*initial_size=*/64);
+  StringBuilderAppend(&query, "SELECT * FROM ");
+  StringBuilderAppend(&query, table_name);
+
+  sqlite3_stmt* stmt = NULL;
+  int rc =
+      sqlite3_prepare_v2(conn->conn, query.buffer, query.size, &stmt, /*pzTail=*/NULL);
+  StringBuilderReset(&query);
+  if (rc != SQLITE_OK) {
+    SetError(error, "Failed to prepare query: %s", sqlite3_errmsg(conn->conn));
+    return ADBC_STATUS_INTERNAL;
+  }
+
+  struct ArrowArrayStream stream = {0};
+  AdbcStatusCode status = AdbcSqliteExportReader(conn->conn, stmt, /*binder=*/NULL,
+                                                 /*batch_size=*/64, &stream, error);
+  if (status == ADBC_STATUS_OK) {
+    int code = stream.get_schema(&stream, schema);
+    if (code != 0) {
+      SetError(error, "Failed to get schema: (%d) %s", code, strerror(code));
+      status = ADBC_STATUS_IO;
+    }
+  }
+  if (stream.release) {
+    stream.release(&stream);
+  }
+  (void)sqlite3_finalize(stmt);
+  return status;
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypesImpl(struct ArrowSchema* schema,
+                                                 struct ArrowArray* array,
+                                                 struct AdbcError* error) {
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/1), error);
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_STRING), error);
+  CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "table_type"), error);
+  schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+
+  CHECK_NA(INTERNAL, ArrowArrayInitFromSchema(array, schema, NULL), error);
+  CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+  CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("table")),
+           error);
+  CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+  CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("view")),
+           error);
+  CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+
+  CHECK_NA(INTERNAL, ArrowArrayFinishBuilding(array, NULL), error);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
+                                             struct ArrowArrayStream* out,
+                                             struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+  struct ArrowSchema schema = {0};
+  struct ArrowArray array = {0};
+
+  AdbcStatusCode status = SqliteConnectionGetTableTypesImpl(&schema, &array, error);
+  if (status != ADBC_STATUS_OK) {
+    if (schema.release) schema.release(&schema);
+    if (array.release) array.release(&array);
+    return status;
+  }
+  return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
+                                             const uint8_t* serialized_partition,
+                                             size_t serialized_length,
+                                             struct ArrowArrayStream* out,
+                                             struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
+                                      struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  if (!conn->active_transaction) {
+    SetError(error, "No active transaction, cannot commit");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
+  if (status != ADBC_STATUS_OK) return status;
+  return ExecuteQuery(conn, "BEGIN", error);
+}
+
+AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
+                                        struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  if (!conn->active_transaction) {
+    SetError(error, "No active transaction, cannot rollback");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  AdbcStatusCode status = ExecuteQuery(conn, "ROLLBACK", error);
+  if (status != ADBC_STATUS_OK) return status;
+  return ExecuteQuery(conn, "BEGIN", error);
+}
+
+AdbcStatusCode SqliteStatementNew(struct AdbcConnection* connection,
+                                  struct AdbcStatement* statement,
+                                  struct AdbcError* error) {
+  CHECK_CONN_INIT(connection, error);
+  struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+  if (statement->private_data) {
+    SetError(error, "AdbcStatementNew: statement already allocated");
+    return ADBC_STATUS_INVALID_STATE;
+  } else if (!conn->conn) {
+    SetError(error, "AdbcStatementNew: connection is not initialized");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  statement->private_data = malloc(sizeof(struct SqliteStatement));
+  memset(statement->private_data, 0, sizeof(struct SqliteStatement));
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+  stmt->conn = conn->conn;
+
+  // Default options
+  stmt->batch_size = 1024;
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementRelease(struct AdbcStatement* statement,
+                                      struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+  int rc = SQLITE_OK;
+  if (stmt->stmt) {
+    rc = sqlite3_finalize(stmt->stmt);
+  }
+  if (stmt->query) free(stmt->query);
+  AdbcSqliteBinderRelease(&stmt->binder);
+  if (stmt->target_table) free(stmt->target_table);
+  if (rc != SQLITE_OK) {
+    SetError(error, "AdbcStatementRelease: statement failed to finalize: (%d) %s", rc,
+             sqlite3_errmsg(stmt->conn));
+  }
+  free(statement->private_data);
+  statement->private_data = NULL;
+
+  return rc == SQLITE_OK ? ADBC_STATUS_OK : ADBC_STATUS_IO;
+}
+
+AdbcStatusCode SqliteStatementPrepare(struct AdbcStatement* statement,
+                                      struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+  if (!stmt->query) {
+    SetError(error, "Must SetSqlQuery before ExecuteQuery or Prepare");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  if (stmt->prepared == 0) {
+    if (stmt->stmt) {
+      int rc = sqlite3_finalize(stmt->stmt);
+      stmt->stmt = NULL;
+      if (rc != SQLITE_OK) {
+        SetError(error, "Failed to finalize previous statement: (%d) %s", rc,
+                 sqlite3_errmsg(stmt->conn));
+        return ADBC_STATUS_IO;
+      }
+    }
+
+    int rc =
+        sqlite3_prepare_v2(stmt->conn, stmt->query, (int)stmt->query_len, &stmt->stmt,
+                           /*pzTail=*/NULL);
+    if (rc != SQLITE_OK) {
+      SetError(error, "Failed to prepare query: %s\nQuery:%s", sqlite3_errmsg(stmt->conn),
+               stmt->query);
+      (void)sqlite3_finalize(stmt->stmt);
+      stmt->stmt = NULL;
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+    stmt->prepared = 1;
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementInitIngest(struct SqliteStatement* stmt,
+                                         sqlite3_stmt** insert_statement,
+                                         struct AdbcError* error) {
+  AdbcStatusCode code = ADBC_STATUS_OK;
+
+  // Create statements for CREATE TABLE / INSERT
+  struct StringBuilder create_query = {0};
+  struct StringBuilder insert_query = {0};
+  StringBuilderInit(&create_query, 256);
+  StringBuilderInit(&insert_query, 256);
+
+  StringBuilderAppend(&create_query, "CREATE TABLE ");
+  StringBuilderAppend(&create_query, stmt->target_table);
+  StringBuilderAppend(&create_query, " (");
+
+  StringBuilderAppend(&insert_query, "INSERT INTO ");
+  StringBuilderAppend(&insert_query, stmt->target_table);
+  StringBuilderAppend(&insert_query, " VALUES (");
+
+  for (int i = 0; i < stmt->binder.schema.n_children; i++) {
+    if (i > 0) StringBuilderAppend(&create_query, ", ");
+    // XXX: should escape the column name too
+    StringBuilderAppend(&create_query, stmt->binder.schema.children[i]->name);
+
+    if (i > 0) StringBuilderAppend(&insert_query, ", ");
+    StringBuilderAppend(&insert_query, "?");
+  }
+  StringBuilderAppend(&create_query, ")");
+  StringBuilderAppend(&insert_query, ")");
+
+  sqlite3_stmt* create = NULL;
+  if (!stmt->append) {
+    // Create table
+    int rc = sqlite3_prepare_v2(stmt->conn, create_query.buffer, (int)create_query.size,
+                                &create, /*pzTail=*/NULL);
+    if (rc == SQLITE_OK) {
+      rc = sqlite3_step(create);
+    }
+
+    if (rc != SQLITE_OK && rc != SQLITE_DONE) {
+      SetError(error, "Failed to create table: %s (executed '%s')",
+               sqlite3_errmsg(stmt->conn), create_query.buffer);
+      code = ADBC_STATUS_INTERNAL;
+    }
+  }
+
+  if (code == ADBC_STATUS_OK) {
+    int rc = sqlite3_prepare_v2(stmt->conn, insert_query.buffer, (int)insert_query.size,
+                                insert_statement, /*pzTail=*/NULL);
+    if (rc != SQLITE_OK) {
+      SetError(error, "Failed to prepare statement: %s (executed '%s')",
+               sqlite3_errmsg(stmt->conn), insert_query.buffer);
+      code = ADBC_STATUS_INTERNAL;
+    }
+  }
+
+  sqlite3_finalize(create);
+  StringBuilderReset(&create_query);
+  StringBuilderReset(&insert_query);
+  return code;
+}
+
+AdbcStatusCode SqliteStatementExecuteIngest(struct SqliteStatement* stmt,
+                                            int64_t* rows_affected,
+                                            struct AdbcError* error) {
+  if (!stmt->binder.schema.release) {
+    SetError(error, "Must Bind() before bulk ingestion");
+    return ADBC_STATUS_INVALID_STATE;
+  }
+
+  sqlite3_stmt* insert = NULL;
+  AdbcStatusCode status = SqliteStatementInitIngest(stmt, &insert, error);
+
+  int64_t row_count = 0;
+  if (status == ADBC_STATUS_OK) {
+    while (1) {
+      char finished = 0;
+      status =
+          AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, insert, &finished, error);
+      if (status != ADBC_STATUS_OK || finished) break;
+
+      int rc = 0;
+      do {
+        rc = sqlite3_step(insert);
+      } while (rc == SQLITE_ROW);
+      if (rc != SQLITE_DONE) {
+        SetError(error, "Failed to execute statement: %s", sqlite3_errmsg(stmt->conn));
+        status = ADBC_STATUS_INTERNAL;
+        break;
+      }
+      row_count++;
+    }
+  }
+
+  if (rows_affected) *rows_affected = row_count;
+  if (insert) sqlite3_finalize(insert);
+  AdbcSqliteBinderRelease(&stmt->binder);
+  return status;
+}
+
+AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
+                                           struct ArrowArrayStream* out,
+                                           int64_t* rows_affected,
+                                           struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+  if (stmt->target_table) {
+    return SqliteStatementExecuteIngest(stmt, rows_affected, error);
+  }
+
+  AdbcStatusCode status = SqliteStatementPrepare(statement, error);
+  if (status != ADBC_STATUS_OK) return status;
+
+  if (stmt->binder.schema.release) {
+    int64_t expected = sqlite3_bind_parameter_count(stmt->stmt);
+    int64_t actual = stmt->binder.schema.n_children;
+    if (actual != expected) {
+      SetError(error, "Parameter count mismatch: expected %" PRId64 " but found %" PRId64,
+               expected, actual);
+      return ADBC_STATUS_INVALID_STATE;
+    }
+  }
+
+  if (!out) {
+    // Update
+    sqlite3_mutex_enter(sqlite3_db_mutex(stmt->conn));
+
+    AdbcStatusCode status = ADBC_STATUS_OK;
+    int64_t rows = 0;
+
+    while (1) {
+      if (stmt->binder.schema.release) {
+        char finished = 0;
+        status = AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, stmt->stmt,
+                                          &finished, error);
+        if (status != ADBC_STATUS_OK || finished) {
+          break;
+        }
+      }
+
+      while (sqlite3_step(stmt->stmt) == SQLITE_ROW) {
+        rows++;
+      }
+      if (!stmt->binder.schema.release) break;
+    }
+
+    if (sqlite3_reset(stmt->stmt) != SQLITE_OK) {
+      status = ADBC_STATUS_IO;
+      const char* msg = sqlite3_errmsg(stmt->conn);
+      SetError(error, "Failed to execute query: %s",
+               (msg == NULL) ? "(unknown error)" : msg);
+    }
+
+    sqlite3_mutex_leave(sqlite3_db_mutex(stmt->conn));
+
+    AdbcSqliteBinderRelease(&stmt->binder);
+    if (rows_affected) *rows_affected = rows;
+    return status;
+  }
+
+  // Query
+  if (rows_affected) *rows_affected = -1;
+  struct AdbcSqliteBinder* binder = stmt->binder.schema.release ? &stmt->binder : NULL;
+  return AdbcSqliteExportReader(stmt->conn, stmt->stmt, binder, stmt->batch_size, out,
+                                error);
+}
+
+AdbcStatusCode SqliteStatementSetSqlQuery(struct AdbcStatement* statement,
+                                          const char* query, struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+  if (stmt->query) {
+    free(stmt->query);
+    stmt->query = NULL;
+  }
+  if (stmt->target_table) {
+    free(stmt->target_table);
+    stmt->target_table = NULL;
+  }
+  size_t len = strlen(query) + 1;
+  stmt->query = malloc(len);
+  stmt->query_len = len;
+  stmt->prepared = 0;
+  strncpy(stmt->query, query, len);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementSetSubstraitPlan(struct AdbcStatement* statement,
+                                               const uint8_t* plan, size_t length,
+                                               struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  SetError(error, "Substrait is not supported");
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
+                                   struct ArrowArray* values, struct ArrowSchema* schema,
+                                   struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+  return AdbcSqliteBinderSetArray(&stmt->binder, values, schema, error);
+}
+
+AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
+                                         struct ArrowArrayStream* stream,
+                                         struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+  return AdbcSqliteBinderSetArrayStream(&stmt->binder, stream, error);
+}
+
+AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
+                                                 struct ArrowSchema* schema,
+                                                 struct AdbcError* error) {
+  AdbcStatusCode status = SqliteStatementPrepare(statement, error);
+  if (status != ADBC_STATUS_OK) return status;
+
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+  int num_params = sqlite3_bind_parameter_count(stmt->stmt);
+  if (num_params < 0) {
+    // Should not happen
+    SetError(error, "SQLite returned negative parameter count");
+    return ADBC_STATUS_INTERNAL;
+  }
+
+  CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, num_params), error);
+  char buffer[11];
+  for (int i = 0; i < num_params; i++) {
+    const char* name = sqlite3_bind_parameter_name(stmt->stmt, i + 1);
+    if (name == NULL) {
+      snprintf(buffer, sizeof(buffer), "%d", i);
+      name = buffer;
+    }
+    CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[i], NANOARROW_TYPE_NA), error);
+    CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[i], name), error);
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const char* key,
+                                        const char* value, struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+  if (strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
+    if (stmt->query) {
+      free(stmt->query);
+      stmt->query = NULL;
+    }
+    if (stmt->target_table) {
+      free(stmt->target_table);
+      stmt->target_table = NULL;
+    }
+
+    size_t len = strlen(value) + 1;
+    stmt->target_table = (char*)malloc(len);
+    strncpy(stmt->target_table, value, len);
+    return ADBC_STATUS_OK;
+  } else if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
+    if (strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
+      stmt->append = 1;
+    } else if (strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
+      stmt->append = 0;
+    } else {
+      SetError(error, "Invalid statement option value %s=%s", key, value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+    return ADBC_STATUS_OK;
+  } else if (strcmp(key, kStatementOptionBatchRows) == 0) {
+    char* end = NULL;
+    long batch_size = strtol(value, &end, /*base=*/10);  // NOLINT(runtime/int)
+    if (errno != 0) {
+      SetError(error, "Invalid statement option value %s=%s (out of range)", key, value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    } else if (batch_size <= 0) {
+      SetError(error,
+               "Invalid statement option value %s=%s (value is non-positive or invalid)",
+               key, value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    } else if (batch_size > (long)INT_MAX) {  // NOLINT(runtime/int)
+      SetError(error,
+               "Invalid statement option value %s=%s (value is out of range of int)", key,
+               value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+    stmt->batch_size = (int)batch_size;
+    return ADBC_STATUS_OK;
+  }
+  SetError(error, "Unknown statement option %s=%s", key, value);
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
+                                                struct ArrowSchema* schema,
+                                                struct AdbcPartitions* partitions,
+                                                int64_t* rows_affected,
+                                                struct AdbcError* error) {
+  CHECK_STMT_INIT(statement, error);
+  SetError(error, "Partitioned result sets are not supported");
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}  // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError* error) {
+  if (version != ADBC_VERSION_1_0_0) {
+    SetError(error, "Only version %d supported, got %d", ADBC_VERSION_1_0_0, version);
+    return ADBC_STATUS_NOT_IMPLEMENTED;
+  }
+
+  struct AdbcDriver* driver = (struct AdbcDriver*)raw_driver;
+  memset(driver, 0, sizeof(*driver));
+  driver->DatabaseInit = SqliteDatabaseInit;
+  driver->DatabaseNew = SqliteDatabaseNew;
+  driver->DatabaseRelease = SqliteDatabaseRelease;
+  driver->DatabaseSetOption = SqliteDatabaseSetOption;
+
+  driver->ConnectionCommit = SqliteConnectionCommit;
+  driver->ConnectionGetInfo = SqliteConnectionGetInfo;
+  driver->ConnectionGetObjects = SqliteConnectionGetObjects;
+  driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
+  driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
+  driver->ConnectionInit = SqliteConnectionInit;
+  driver->ConnectionNew = SqliteConnectionNew;
+  driver->ConnectionReadPartition = SqliteConnectionReadPartition;
+  driver->ConnectionRelease = SqliteConnectionRelease;
+  driver->ConnectionRollback = SqliteConnectionRollback;
+  driver->ConnectionSetOption = SqliteConnectionSetOption;
+
+  driver->StatementBind = SqliteStatementBind;
+  driver->StatementBindStream = SqliteStatementBindStream;
+  driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
+  driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
+  driver->StatementNew = SqliteStatementNew;
+  driver->StatementPrepare = SqliteStatementPrepare;
+  driver->StatementRelease = SqliteStatementRelease;
+  driver->StatementSetOption = SqliteStatementSetOption;
+  driver->StatementSetSqlQuery = SqliteStatementSetSqlQuery;
+  return ADBC_STATUS_OK;
+}
+
+// Public names
+
+AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
+  return SqliteDatabaseNew(database, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
+                                     const char* value, struct AdbcError* error) {
+  return SqliteDatabaseSetOption(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
+  return SqliteDatabaseInit(database, error);
+}
+
+AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
+                                   struct AdbcError* error) {
+  return SqliteDatabaseRelease(database, error);
+}
+
+AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
+                                 struct AdbcError* error) {
+  return SqliteConnectionNew(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+                                       const char* value, struct AdbcError* error) {
+  return SqliteConnectionSetOption(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
+                                  struct AdbcDatabase* database,
+                                  struct AdbcError* error) {
+  return SqliteConnectionInit(connection, database, error);
+}
+
+AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
+                                     struct AdbcError* error) {
+  return SqliteConnectionRelease(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
+                                     uint32_t* info_codes, size_t info_codes_length,
+                                     struct ArrowArrayStream* out,
+                                     struct AdbcError* error) {
+  return SqliteConnectionGetInfo(connection, info_codes, info_codes_length, out, error);
+}
+
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+                                        const char* catalog, const char* db_schema,
+                                        const char* table_name, const char** table_type,
+                                        const char* column_name,
+                                        struct ArrowArrayStream* out,
+                                        struct AdbcError* error) {
+  return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
+                                    table_type, column_name, out, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+                                            const char* catalog, const char* db_schema,
+                                            const char* table_name,
+                                            struct ArrowSchema* schema,
+                                            struct AdbcError* error) {
+  return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
+                                        schema, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
+                                           struct ArrowArrayStream* out,
+                                           struct AdbcError* error) {
+  return SqliteConnectionGetTableTypes(connection, out, error);
+}
+
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+                                           const uint8_t* serialized_partition,
+                                           size_t serialized_length,
+                                           struct ArrowArrayStream* out,
+                                           struct AdbcError* error) {
+  return SqliteConnectionReadPartition(connection, serialized_partition,
+                                       serialized_length, out, error);
+}
+
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+                                    struct AdbcError* error) {
+  return SqliteConnectionCommit(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+                                      struct AdbcError* error) {
+  return SqliteConnectionRollback(connection, error);
+}
+
+AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
+                                struct AdbcStatement* statement,
+                                struct AdbcError* error) {
+  return SqliteStatementNew(connection, statement, error);
+}
+
+AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
+                                    struct AdbcError* error) {
+  return SqliteStatementRelease(statement, error);
+}
+
+AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
+                                         struct ArrowArrayStream* out,
+                                         int64_t* rows_affected,
+                                         struct AdbcError* error) {
+  return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
+}
+
+AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
+                                    struct AdbcError* error) {
+  return SqliteStatementPrepare(statement, error);
+}
+
+AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
+                                        const char* query, struct AdbcError* error) {
+  return SqliteStatementSetSqlQuery(statement, query, error);
+}
+
+AdbcStatusCode AdbcStatementSetSubstraitPlan(struct AdbcStatement* statement,
+                                             const uint8_t* plan, size_t length,
+                                             struct AdbcError* error) {
+  return SqliteStatementSetSubstraitPlan(statement, plan, length, error);
+}
+
+AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
+                                 struct ArrowArray* values, struct ArrowSchema* schema,
+                                 struct AdbcError* error) {
+  return SqliteStatementBind(statement, values, schema, error);
+}
+
+AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
+                                       struct ArrowArrayStream* stream,
+                                       struct AdbcError* error) {
+  return SqliteStatementBindStream(statement, stream, error);
+}
+
+AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
+                                               struct ArrowSchema* schema,
+                                               struct AdbcError* error) {
+  return SqliteStatementGetParameterSchema(statement, schema, error);
+}
+
+AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
+                                      const char* value, struct AdbcError* error) {
+  return SqliteStatementSetOption(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+                                              struct ArrowSchema* schema,
+                                              struct AdbcPartitions* partitions,
+                                              int64_t* rows_affected,
+                                              struct AdbcError* error) {
+  return SqliteStatementExecutePartitions(statement, schema, partitions, rows_affected,
+                                          error);
+}  // NOLINT(whitespace/indent)
+// due to https://github.com/cpplint/cpplint/pull/189
+
+AdbcStatusCode AdbcDriverInit(int version, void* driver, struct AdbcError* error) {
+  return SqliteDriverInit(version, driver, error);
+}
diff --git a/c/driver/sqlite/sqlite.cc b/c/driver/sqlite/sqlite.cc
deleted file mode 100644
index fac667c..0000000
--- a/c/driver/sqlite/sqlite.cc
+++ /dev/null
@@ -1,1827 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <sqlite3.h>
-
-#include <algorithm>
-#include <cstring>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <unordered_map>
-#include <unordered_set>
-
-#include <arrow/builder.h>
-#include <arrow/c/bridge.h>
-#include <arrow/record_batch.h>
-#include <arrow/status.h>
-#include <arrow/table.h>
-#include <arrow/util/config.h>
-#include <arrow/util/logging.h>
-#include <arrow/util/string_builder.h>
-
-#include "adbc.h"
-#include "arrow/type_fwd.h"
-#include "driver/util.h"
-
-namespace {
-
-using arrow::Status;
-
-void ReleaseError(struct AdbcError* error) {
-  delete[] error->message;
-  error->message = nullptr;
-}
-
-template <typename... Args>
-void SetError(struct AdbcError* error, Args&&... args) {
-  if (!error) return;
-  std::string message =
-      arrow::util::StringBuilder("[SQLite3] ", std::forward<Args>(args)...);
-  if (error->message) {
-    message.reserve(message.size() + 1 + std::strlen(error->message));
-    message.append(1, '\n');
-    message.append(error->message);
-    delete[] error->message;
-  }
-  error->message = new char[message.size() + 1];
-  message.copy(error->message, message.size());
-  error->message[message.size()] = '\0';
-  error->release = ReleaseError;
-}
-
-void SetError(sqlite3* db, const std::string& source, struct AdbcError* error) {
-  return SetError(error, source, ": ", sqlite3_errmsg(db));
-}
-
-AdbcStatusCode CheckRc(sqlite3* db, int rc, const char* context,
-                       struct AdbcError* error) {
-  if (rc != SQLITE_OK) {
-    SetError(db, context, error);
-    return ADBC_STATUS_IO;
-  }
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* context,
-                       struct AdbcError* error) {
-  if (rc != SQLITE_OK) {
-    SetError(db, context, error);
-    rc = sqlite3_finalize(stmt);
-    if (rc != SQLITE_OK) {
-      SetError(db, "sqlite3_finalize", error);
-    }
-    return ADBC_STATUS_IO;
-  }
-  return ADBC_STATUS_OK;
-}
-
-template <typename CallbackFn>
-AdbcStatusCode DoQuery(sqlite3* db, sqlite3_stmt* stmt, struct AdbcError* error,
-                       CallbackFn&& callback) {
-  auto status = std::move(callback)();
-  std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
-  return status;
-}
-
-template <typename CallbackFn>
-AdbcStatusCode DoQuery(sqlite3* db, const char* query, struct AdbcError* error,
-                       CallbackFn&& callback) {
-  sqlite3_stmt* stmt;
-  int rc = sqlite3_prepare_v2(db, query, std::strlen(query), &stmt, /*pzTail=*/nullptr);
-  if (rc != SQLITE_OK) return CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
-  auto status = std::move(callback)(stmt);
-  std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
-  return status;
-}
-
-arrow::Status ToArrowStatus(AdbcStatusCode code, struct AdbcError* error) {
-  if (code == ADBC_STATUS_OK) return Status::OK();
-  // TODO:
-  return Status::UnknownError(code);
-}
-
-AdbcStatusCode FromArrowStatus(const Status& status, struct AdbcError* error) {
-  if (status.ok()) return ADBC_STATUS_OK;
-  SetError(error, status);
-  // TODO: map Arrow codes to ADBC codes
-  return ADBC_STATUS_INTERNAL;
-}
-
-AdbcStatusCode ExportDataToStream(std::shared_ptr<arrow::Schema> schema,
-                                  arrow::RecordBatchVector batches,
-                                  struct ArrowArrayStream* stream,
-                                  struct AdbcError* error) {
-  std::shared_ptr<arrow::RecordBatchReader> reader;
-  auto status = arrow::RecordBatchReader::Make(std::move(batches), std::move(schema))
-                    .Value(&reader);
-  ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
-  status = arrow::ExportRecordBatchReader(std::move(reader), stream);
-  ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
-  return ADBC_STATUS_OK;
-}
-
-std::shared_ptr<arrow::Schema> StatementToSchema(sqlite3_stmt* stmt) {
-  // TODO: this is fundamentally the wrong way to go about
-  // this. instead, we need to act like the CSV/JSON readers: sample
-  // several rows and dynamically update the column type as we go.
-  const int num_columns = sqlite3_column_count(stmt);
-  arrow::FieldVector fields(num_columns);
-  for (int i = 0; i < num_columns; i++) {
-    const char* column_name = sqlite3_column_name(stmt, i);
-    const int column_type = sqlite3_column_type(stmt, i);
-    std::shared_ptr<arrow::DataType> arrow_type = nullptr;
-    switch (column_type) {
-      case SQLITE_INTEGER:
-        arrow_type = arrow::int64();
-        break;
-      case SQLITE_FLOAT:
-        arrow_type = arrow::float64();
-        break;
-      case SQLITE_BLOB:
-        arrow_type = arrow::binary();
-        break;
-      case SQLITE_TEXT:
-        arrow_type = arrow::utf8();
-        break;
-      case SQLITE_NULL:
-      default:
-        arrow_type = arrow::null();
-        break;
-    }
-    fields[i] = arrow::field(column_name, std::move(arrow_type));
-  }
-  return arrow::schema(std::move(fields));
-}
-
-class SqliteDatabaseImpl {
- public:
-  SqliteDatabaseImpl() : db_(nullptr), connection_count_(0) {}
-
-  AdbcStatusCode Connect(sqlite3** db, struct AdbcError* error) {
-    std::lock_guard<std::mutex> guard(mutex_);
-    if (!db_) {
-      SetError(error, "Database not yet initialized, call AdbcDatabaseInit");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    // Create a new connection
-    if (database_uri_ == ":memory:") {
-      // unless the special ":memory:" filename is used
-      *db = db_;
-    } else {
-      int rc =
-          sqlite3_open_v2(database_uri_.c_str(), db,
-                          SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
-                          /*zVfs=*/nullptr);
-      ADBC_RETURN_NOT_OK(CheckRc(*db, nullptr, rc, "sqlite3_open_v2", error));
-    }
-    ++connection_count_;
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Init(struct AdbcError* error) {
-    if (db_) {
-      SetError(error, "Database already initialized");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    database_uri_ = "file:adbc_sqlite_driver?mode=memory&cache=shared";
-    auto it = options_.find("filename");
-    if (it != options_.end()) {
-      database_uri_ = it->second;
-    }
-
-    int rc = sqlite3_open_v2(database_uri_.c_str(), &db_,
-                             SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
-                             /*zVfs=*/nullptr);
-    ADBC_RETURN_NOT_OK(CheckRc(db_, nullptr, rc, "sqlite3_open_v2", error));
-    options_.clear();
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error) {
-    if (db_) {
-      SetError(error, "Database already initialized");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    options_[key] = value;
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Disconnect(sqlite3* db, struct AdbcError* error) {
-    std::lock_guard<std::mutex> guard(mutex_);
-    if (--connection_count_ < 0) {
-      SetError(error, "Connection count underflow");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    // Close the database unless :memory:
-    if (database_uri_ != ":memory:") {
-      if (sqlite3_close(db) != SQLITE_OK) {
-        if (db) SetError(db, "sqlite3_close", error);
-        return ADBC_STATUS_IO;
-      }
-    }
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Release(struct AdbcError* error) {
-    std::lock_guard<std::mutex> guard(mutex_);
-
-    if (connection_count_ > 0) {
-      SetError(error, "Cannot release database with ", connection_count_,
-               " open connections");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-
-    auto status = sqlite3_close(db_);
-    if (status != SQLITE_OK) {
-      if (db_) SetError(db_, "sqlite3_close", error);
-      return ADBC_STATUS_IO;
-    }
-    return ADBC_STATUS_OK;
-  }
-
- private:
-  sqlite3* db_;
-  int connection_count_;
-  std::string database_uri_;
-  std::unordered_map<std::string, std::string> options_;
-  std::mutex mutex_;
-};
-
-class SqliteConnectionImpl {
- public:
-  SqliteConnectionImpl() : database_(nullptr), db_(nullptr), autocommit_(true) {}
-
-  sqlite3* db() const { return db_; }
-
-  AdbcStatusCode GetInfo(uint32_t* info_codes, size_t info_codes_length,
-                         struct ArrowArrayStream* stream, struct AdbcError* error) {
-    static std::shared_ptr<arrow::Schema> kInfoSchema = arrow::schema({
-        arrow::field("info_name", arrow::uint32(), /*nullable=*/false),
-        arrow::field(
-            "info_value",
-            arrow::dense_union({
-                arrow::field("string_value", arrow::utf8()),
-                arrow::field("bool_value", arrow::boolean()),
-                arrow::field("int64_value", arrow::int64()),
-                arrow::field("int32_bitmask", arrow::int32()),
-                arrow::field("string_list", arrow::list(arrow::utf8())),
-                arrow::field("int32_to_int32_list_map",
-                             arrow::map(arrow::int32(), arrow::list(arrow::int32()))),
-            })),
-    });
-    static int kStringValueCode = 0;
-
-    static std::vector<uint32_t> kSupported = {
-        ADBC_INFO_VENDOR_NAME,    ADBC_INFO_VENDOR_VERSION,       ADBC_INFO_DRIVER_NAME,
-        ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
-    };
-
-    if (!info_codes) {
-      info_codes = kSupported.data();
-      info_codes_length = kSupported.size();
-    }
-
-    arrow::UInt32Builder info_name;
-    std::unique_ptr<arrow::ArrayBuilder> info_value_builder;
-    ADBC_RETURN_NOT_OK(
-        FromArrowStatus(MakeBuilder(arrow::default_memory_pool(),
-                                    kInfoSchema->field(1)->type(), &info_value_builder),
-                        error));
-    arrow::DenseUnionBuilder* info_value =
-        static_cast<arrow::DenseUnionBuilder*>(info_value_builder.get());
-    arrow::StringBuilder* info_string =
-        static_cast<arrow::StringBuilder*>(info_value->child_builder(0).get());
-
-    for (size_t i = 0; i < info_codes_length; i++) {
-      switch (info_codes[i]) {
-        case ADBC_INFO_VENDOR_NAME:
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_value->Append(kStringValueCode), error));
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_string->Append("SQLite3"), error));
-          break;
-        case ADBC_INFO_VENDOR_VERSION:
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_value->Append(kStringValueCode), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_string->Append(sqlite3_libversion()), error));
-          break;
-        case ADBC_INFO_DRIVER_NAME:
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_value->Append(kStringValueCode), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_string->Append("ADBC C SQLite3"), error));
-          break;
-        case ADBC_INFO_DRIVER_VERSION:
-          // TODO: set up CMake to embed version info
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_value->Append(kStringValueCode), error));
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_string->Append("0.0.1"), error));
-          break;
-        case ADBC_INFO_DRIVER_ARROW_VERSION:
-          ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(info_value->Append(kStringValueCode), error));
-          ADBC_RETURN_NOT_OK(FromArrowStatus(
-              info_string->Append("Arrow/C++ " ARROW_VERSION_STRING), error));
-          break;
-        default:
-          // Unrecognized
-          break;
-      }
-    }
-
-    arrow::ArrayVector arrays(2);
-    ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Finish(&arrays[0]), error));
-    ADBC_RETURN_NOT_OK(FromArrowStatus(info_value->Finish(&arrays[1]), error));
-    const int64_t rows = arrays[0]->length();
-    return ExportDataToStream(
-        kInfoSchema,
-        {
-            arrow::RecordBatch::Make(kInfoSchema, rows, std::move(arrays)),
-        },
-        stream, error);
-  }
-
-  AdbcStatusCode GetObjects(int depth, const char* catalog, const char* db_schema,
-                            const char* table_name, const char** table_type,
-                            const char* column_name, struct ArrowArrayStream* stream,
-                            struct AdbcError* error) {
-    static std::shared_ptr<arrow::DataType> kColumnSchema = arrow::struct_({
-        arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
-        arrow::field("ordinal_position", arrow::int32()),
-        arrow::field("remarks", arrow::utf8()),
-        arrow::field("xdbc_data_type", arrow::int16()),
-        arrow::field("xdbc_type_name", arrow::utf8()),
-        arrow::field("xdbc_column_size", arrow::int32()),
-        arrow::field("xdbc_decimal_digits", arrow::int16()),
-        arrow::field("xdbc_num_prec_radix", arrow::int16()),
-        arrow::field("xdbc_nullable", arrow::int16()),
-        arrow::field("xdbc_column_def", arrow::utf8()),
-        arrow::field("xdbc_sql_data_type", arrow::int16()),
-        arrow::field("xdbc_datetime_sub", arrow::int16()),
-        arrow::field("xdbc_char_octet_length", arrow::int32()),
-        arrow::field("xdbc_is_nullable", arrow::utf8()),
-        arrow::field("xdbc_scope_catalog", arrow::utf8()),
-        arrow::field("xdbc_scope_schema", arrow::utf8()),
-        arrow::field("xdbc_scope_table", arrow::utf8()),
-        arrow::field("xdbc_is_autoincrement", arrow::boolean()),
-        arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
-    });
-    static std::shared_ptr<arrow::DataType> kUsageSchema = arrow::struct_({
-        arrow::field("fk_catalog", arrow::utf8()),
-        arrow::field("fk_db_schema", arrow::utf8()),
-        arrow::field("fk_table", arrow::utf8()),
-        arrow::field("fk_column_name", arrow::utf8()),
-    });
-    static std::shared_ptr<arrow::DataType> kConstraintSchema = arrow::struct_({
-        arrow::field("constraint_name", arrow::utf8()),
-        arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
-        arrow::field("constraint_column_names", arrow::list(arrow::utf8()),
-                     /*nullable=*/false),
-        arrow::field("constraint_column_usage", arrow::list(kUsageSchema)),
-    });
-    static std::shared_ptr<arrow::DataType> kTableSchema = arrow::struct_({
-        arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
-        arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
-        arrow::field("table_columns", arrow::list(kColumnSchema)),
-        arrow::field("table_constraints", arrow::list(kConstraintSchema)),
-    });
-    static std::shared_ptr<arrow::DataType> kDbSchemaSchema = arrow::struct_({
-        arrow::field("db_schema_name", arrow::utf8()),
-        arrow::field("db_schema_tables", arrow::list(kTableSchema)),
-    });
-    static std::shared_ptr<arrow::Schema> kCatalogSchema = arrow::schema({
-        arrow::field("catalog_name", arrow::utf8()),
-        arrow::field("catalog_db_schemas", arrow::list(kDbSchemaSchema)),
-    });
-
-    static const char kTableQuery[] =
-        R"(SELECT name, type
-           FROM sqlite_master
-           WHERE name LIKE ? AND type <> "index"
-           ORDER BY name ASC)";
-    static const char kColumnQuery[] =
-        R"(SELECT cid, name
-           FROM pragma_table_info(?)
-           WHERE name LIKE ?
-           ORDER BY cid ASC)";
-    static const char kPrimaryKeyQuery[] =
-        R"(SELECT name
-           FROM pragma_table_info(?)
-           WHERE pk > 0
-           ORDER BY pk ASC)";
-    static const char kForeignKeyQuery[] =
-        R"(SELECT id, seq, "table", "from", "to"
-           FROM pragma_foreign_key_list(?)
-           ORDER BY id, seq ASC)";
-
-    arrow::StringBuilder catalog_name;
-    std::unique_ptr<arrow::ArrayBuilder> catalog_schemas_builder;
-    ADBC_RETURN_NOT_OK(FromArrowStatus(
-        MakeBuilder(arrow::default_memory_pool(), kCatalogSchema->field(1)->type(),
-                    &catalog_schemas_builder),
-        error));
-    auto* catalog_schemas =
-        static_cast<arrow::ListBuilder*>(catalog_schemas_builder.get());
-    auto* catalog_schemas_items =
-        static_cast<arrow::StructBuilder*>(catalog_schemas->value_builder());
-    auto* db_schema_name =
-        static_cast<arrow::StringBuilder*>(catalog_schemas_items->child_builder(0).get());
-    auto* db_schema_tables =
-        static_cast<arrow::ListBuilder*>(catalog_schemas_items->child_builder(1).get());
-    auto* db_schema_tables_items =
-        static_cast<arrow::StructBuilder*>(db_schema_tables->value_builder());
-    auto* table_names = static_cast<arrow::StringBuilder*>(
-        db_schema_tables_items->child_builder(0).get());
-    auto* table_types = static_cast<arrow::StringBuilder*>(
-        db_schema_tables_items->child_builder(1).get());
-    auto* table_columns =
-        static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(2).get());
-    auto* table_columns_items =
-        static_cast<arrow::StructBuilder*>(table_columns->value_builder());
-    auto* column_names =
-        static_cast<arrow::StringBuilder*>(table_columns_items->child_builder(0).get());
-    auto* ordinal_positions =
-        static_cast<arrow::Int32Builder*>(table_columns_items->child_builder(1).get());
-    auto* table_constraints =
-        static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(3).get());
-    auto* table_constraints_items =
-        static_cast<arrow::StructBuilder*>(table_constraints->value_builder());
-    auto* constraint_names = static_cast<arrow::StringBuilder*>(
-        table_constraints_items->child_builder(0).get());
-    auto* constraint_types = static_cast<arrow::StringBuilder*>(
-        table_constraints_items->child_builder(1).get());
-    auto* constraint_column_names =
-        static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(2).get());
-    auto* constraint_column_names_items =
-        static_cast<arrow::StringBuilder*>(constraint_column_names->value_builder());
-    auto* constraint_column_usage =
-        static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(3).get());
-    auto* constraint_column_usage_items =
-        static_cast<arrow::StructBuilder*>(constraint_column_usage->value_builder());
-    auto* constraint_column_usage_fk_catalog = static_cast<arrow::StringBuilder*>(
-        constraint_column_usage_items->child_builder(0).get());
-    auto* constraint_column_usage_fk_db_schema = static_cast<arrow::StringBuilder*>(
-        constraint_column_usage_items->child_builder(1).get());
-    auto* constraint_column_usage_fk_table = static_cast<arrow::StringBuilder*>(
-        constraint_column_usage_items->child_builder(2).get());
-    auto* constraint_column_usage_fk_column_name = static_cast<arrow::StringBuilder*>(
-        constraint_column_usage_items->child_builder(3).get());
-
-    // TODO: filter properly, also implement other attached databases
-    if (!catalog || std::strlen(catalog) == 0) {
-      // https://www.sqlite.org/cli.html
-      // > The ".databases" command shows a list of all databases open
-      // > in the current connection. There will always be at least
-      // > 2. The first one is "main", the original database opened.
-      ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Append("main"), error));
-
-      if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
-        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->AppendNull(), error));
-      } else if (!db_schema || std::strlen(db_schema) == 0) {
-        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
-        ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_name->AppendNull(), error));
-        if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
-          ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->AppendNull(), error));
-        } else {
-          // Look up tables
-
-          std::unordered_set<std::string> table_type_filter;
-          if (table_type) {
-            while (*table_type) {
-              table_type_filter.insert(*table_type);
-              table_type++;
-            }
-          }
-
-          ADBC_RETURN_NOT_OK(
-              DoQuery(db_, kTableQuery, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-                if (table_name) {
-                  ADBC_RETURN_NOT_OK(CheckRc(
-                      db_,
-                      sqlite3_bind_text64(stmt, 1, table_name, std::strlen(table_name),
-                                          SQLITE_STATIC, SQLITE_UTF8),
-                      "sqlite3_bind_text64", error));
-                } else {
-                  ADBC_RETURN_NOT_OK(CheckRc(
-                      db_,
-                      sqlite3_bind_text64(stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8),
-                      "sqlite3_bind_text64", error));
-                }
-
-                int rc = SQLITE_OK;
-                ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->Append(), error));
-                while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
-                  const char* cur_table =
-                      reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
-                  const char* cur_table_type =
-                      reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
-
-                  if (!table_type_filter.empty() &&
-                      table_type_filter.find(cur_table_type) == table_type_filter.end()) {
-                    continue;
-                  }
-
-                  ADBC_RETURN_NOT_OK(
-                      FromArrowStatus(table_names->Append(cur_table), error));
-                  ADBC_RETURN_NOT_OK(
-                      FromArrowStatus(table_types->Append(cur_table_type), error));
-                  if (depth == ADBC_OBJECT_DEPTH_TABLES) {
-                    ADBC_RETURN_NOT_OK(
-                        FromArrowStatus(table_columns->AppendNull(), error));
-                    ADBC_RETURN_NOT_OK(
-                        FromArrowStatus(table_constraints->AppendNull(), error));
-                  } else {
-                    ADBC_RETURN_NOT_OK(FromArrowStatus(table_columns->Append(), error));
-                    ADBC_RETURN_NOT_OK(DoQuery(
-                        db_, kColumnQuery, error,
-                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-                          ADBC_RETURN_NOT_OK(
-                              CheckRc(db_,
-                                      sqlite3_bind_text64(stmt, 1, cur_table,
-                                                          std::strlen(cur_table),
-                                                          SQLITE_STATIC, SQLITE_UTF8),
-                                      "sqlite3_bind_text64", error));
-
-                          if (column_name) {
-                            ADBC_RETURN_NOT_OK(
-                                CheckRc(db_,
-                                        sqlite3_bind_text64(stmt, 2, column_name,
-                                                            std::strlen(column_name),
-                                                            SQLITE_STATIC, SQLITE_UTF8),
-                                        "sqlite3_bind_text64", error));
-                          } else {
-                            ADBC_RETURN_NOT_OK(
-                                CheckRc(db_,
-                                        sqlite3_bind_text64(stmt, 2, "%", 1,
-                                                            SQLITE_STATIC, SQLITE_UTF8),
-                                        "sqlite3_bind_text64", error));
-                          }
-
-                          int rc = SQLITE_OK;
-                          int64_t row_count = 0;
-                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
-                            row_count++;
-                            const int32_t cur_ordinal_position =
-                                1 + sqlite3_column_int(stmt, 0);
-                            const char* cur_column_name = reinterpret_cast<const char*>(
-                                sqlite3_column_text(stmt, 1));
-
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                column_names->Append(cur_column_name), error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                ordinal_positions->Append(cur_ordinal_position), error));
-
-                            ADBC_RETURN_NOT_OK(
-                                FromArrowStatus(table_columns_items->Append(), error));
-                          }
-                          for (int i = 2; i < table_columns_items->num_children(); i++) {
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                table_columns_items->child_builder(i)->AppendNulls(
-                                    row_count),
-                                error));
-                          }
-                          if (rc != SQLITE_DONE) {
-                            return CheckRc(db_, rc, "sqlite3_step", error);
-                          }
-                          return ADBC_STATUS_OK;
-                        }));
-
-                    // We can get primary key and foreign keys, but not unique (without
-                    // parsing the table definition, at least)
-                    ADBC_RETURN_NOT_OK(
-                        FromArrowStatus(table_constraints->Append(), error));
-                    ADBC_RETURN_NOT_OK(DoQuery(
-                        db_, kPrimaryKeyQuery, error,
-                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-                          ADBC_RETURN_NOT_OK(
-                              CheckRc(db_,
-                                      sqlite3_bind_text64(stmt, 1, cur_table,
-                                                          std::strlen(cur_table),
-                                                          SQLITE_STATIC, SQLITE_UTF8),
-                                      "sqlite3_bind_text64", error));
-
-                          int rc = SQLITE_OK;
-                          bool has_primary_key = false;
-                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
-                            if (!has_primary_key) {
-                              ADBC_RETURN_NOT_OK(
-                                  FromArrowStatus(constraint_names->AppendNull(), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_types->Append("PRIMARY KEY"), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_column_names->Append(), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_column_usage->Append(), error));
-                            }
-                            has_primary_key = true;
-                            const char* cur_column_name = reinterpret_cast<const char*>(
-                                sqlite3_column_text(stmt, 0));
-
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_names_items->Append(cur_column_name),
-                                error));
-                          }
-                          if (has_primary_key) {
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                table_constraints_items->Append(), error));
-                          }
-                          if (rc != SQLITE_DONE) {
-                            return CheckRc(db_, rc, "sqlite3_step", error);
-                          }
-                          return ADBC_STATUS_OK;
-                        }));
-                    ADBC_RETURN_NOT_OK(DoQuery(
-                        db_, kForeignKeyQuery, error,
-                        [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-                          ADBC_RETURN_NOT_OK(
-                              CheckRc(db_,
-                                      sqlite3_bind_text64(stmt, 1, cur_table,
-                                                          std::strlen(cur_table),
-                                                          SQLITE_STATIC, SQLITE_UTF8),
-                                      "sqlite3_bind_text64", error));
-
-                          int rc = SQLITE_OK;
-                          int prev_key_id = -1;
-                          while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
-                            const int key_id = sqlite3_column_int(stmt, 0);
-                            const int key_seq = sqlite3_column_int(stmt, 1);
-                            const char* to_table = reinterpret_cast<const char*>(
-                                sqlite3_column_text(stmt, 2));
-                            const char* from_col = reinterpret_cast<const char*>(
-                                sqlite3_column_text(stmt, 3));
-                            const char* to_col = reinterpret_cast<const char*>(
-                                sqlite3_column_text(stmt, 4));
-                            if (key_id != prev_key_id) {
-                              ADBC_RETURN_NOT_OK(
-                                  FromArrowStatus(constraint_names->AppendNull(), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_types->Append("FOREIGN KEY"), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_column_names->Append(), error));
-                              ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                  constraint_column_usage->Append(), error));
-                              if (prev_key_id != -1) {
-                                ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                    table_constraints_items->Append(), error));
-                              }
-                            }
-                            prev_key_id = key_id;
-
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_names_items->Append(from_col), error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_usage_fk_catalog->AppendNull(), error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_usage_fk_db_schema->AppendNull(),
-                                error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_usage_fk_table->Append(to_table),
-                                error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_usage_fk_column_name->Append(to_col),
-                                error));
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                constraint_column_usage_items->Append(), error));
-                          }
-                          if (prev_key_id != -1) {
-                            ADBC_RETURN_NOT_OK(FromArrowStatus(
-                                table_constraints_items->Append(), error));
-                          }
-                          if (rc != SQLITE_DONE) {
-                            return CheckRc(db_, rc, "sqlite3_step", error);
-                          }
-                          return ADBC_STATUS_OK;
-                        }));
-                  }
-
-                  ADBC_RETURN_NOT_OK(
-                      FromArrowStatus(db_schema_tables_items->Append(), error));
-                }
-                if (rc != SQLITE_DONE) {
-                  return CheckRc(db_, rc, "sqlite3_step", error);
-                }
-                return ADBC_STATUS_OK;
-              }));
-        }
-        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas_items->Append(), error));
-      } else {
-        ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
-      }
-    }
-
-    arrow::ArrayVector arrays(2);
-    ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Finish(&arrays[0]), error));
-    ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Finish(&arrays[1]), error));
-    const int64_t rows = arrays[0]->length();
-    return ExportDataToStream(
-        kCatalogSchema,
-        {
-            arrow::RecordBatch::Make(kCatalogSchema, rows, std::move(arrays)),
-        },
-        stream, error);
-  }
-
-  AdbcStatusCode GetTableTypes(struct ArrowArrayStream* stream, struct AdbcError* error) {
-    auto schema =
-        arrow::schema({arrow::field("table_type", arrow::utf8(), /*nullable=*/false)});
-
-    arrow::StringBuilder builder;
-    std::shared_ptr<arrow::Array> array;
-    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("table"), error));
-    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("view"), error));
-    ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Finish(&array), error));
-
-    return ExportDataToStream(
-        schema,
-        {
-            arrow::RecordBatch::Make(schema, /*num_rows=*/2, {std::move(array)}),
-        },
-        stream, error);
-  }
-
-  AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
-                                const char* table_name, struct ArrowSchema* schema,
-                                struct AdbcError* error) {
-    if ((catalog && std::strlen(catalog) > 0) ||
-        (db_schema && std::strlen(db_schema) > 0)) {
-      std::memset(schema, 0, sizeof(*schema));
-      SetError(error, "Catalog/schema are not supported");
-      return ADBC_STATUS_NOT_IMPLEMENTED;
-    }
-
-    std::string query = "SELECT * FROM ";
-    char* escaped = sqlite3_mprintf("%w", table_name);
-    if (!escaped) {
-      // Failed to allocate
-      SetError(error, "Could not escape table name (failed to allocate memory)");
-      return ADBC_STATUS_INTERNAL;
-    }
-    query += escaped;
-    sqlite3_free(escaped);
-
-    std::shared_ptr<arrow::Schema> arrow_schema;
-    ADBC_RETURN_NOT_OK(
-        DoQuery(db_, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-          if (sqlite3_step(stmt) == SQLITE_ERROR) {
-            SetError(db_, "sqlite3_step", error);
-            return ADBC_STATUS_IO;
-          }
-          arrow_schema = StatementToSchema(stmt);
-          return ADBC_STATUS_OK;
-        }));
-    return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
-  }
-
-  AdbcStatusCode Init(struct AdbcDatabase* database, struct AdbcError* error) {
-    if (!database->private_data) {
-      SetError(error, "database is not initialized");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    database_ =
-        *reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
-    return database_->Connect(&db_, error);
-  }
-
-  AdbcStatusCode Release(struct AdbcError* error) {
-    if (!database_) return ADBC_STATUS_OK;
-    return database_->Disconnect(db_, error);
-  }
-
-  AdbcStatusCode SetAutocommit(bool autocommit, struct AdbcError* error) {
-    if (autocommit == autocommit_) return ADBC_STATUS_OK;
-    autocommit_ = autocommit;
-
-    const char* query = autocommit_ ? "COMMIT" : "BEGIN TRANSACTION";
-    return DoQuery(db_, query, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-      return StepStatement(stmt, error);
-    });
-  }
-
-  AdbcStatusCode Commit(struct AdbcError* error) {
-    if (autocommit_) {
-      SetError(error, "Cannot commit when in autocommit mode");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    ADBC_RETURN_NOT_OK(
-        DoQuery(db_, "COMMIT", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-          return StepStatement(stmt, error);
-        }));
-    return DoQuery(db_, "BEGIN TRANSACTION", error,
-                   [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
-  }
-
-  AdbcStatusCode Rollback(struct AdbcError* error) {
-    if (autocommit_) {
-      SetError(error, "Cannot rollback when in autocommit mode");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    ADBC_RETURN_NOT_OK(
-        DoQuery(db_, "ROLLBACK", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-          return StepStatement(stmt, error);
-        }));
-    return DoQuery(db_, "BEGIN TRANSACTION", error,
-                   [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
-  }
-
- private:
-  AdbcStatusCode StepStatement(sqlite3_stmt* stmt, struct AdbcError* error) {
-    int rc = SQLITE_OK;
-    while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
-    }
-    if (rc != SQLITE_DONE) {
-      return CheckRc(db_, rc, "sqlite3_step", error);
-    }
-    return ADBC_STATUS_OK;
-  }
-
-  std::shared_ptr<SqliteDatabaseImpl> database_;
-  sqlite3* db_;
-  bool autocommit_;
-};
-
-AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
-                              int64_t row, int* rc, struct AdbcError* error) {
-  int col_index = 1;
-  for (const auto& column : data.columns()) {
-    if (column->IsNull(row)) {
-      *rc = sqlite3_bind_null(stmt, col_index);
-    } else {
-      switch (column->type()->id()) {
-        case arrow::Type::DOUBLE: {
-          *rc = sqlite3_bind_double(
-              stmt, col_index,
-              static_cast<const arrow::DoubleArray&>(*column).Value(row));
-          break;
-        }
-        case arrow::Type::INT64: {
-          *rc = sqlite3_bind_int64(
-              stmt, col_index, static_cast<const arrow::Int64Array&>(*column).Value(row));
-          break;
-        }
-        case arrow::Type::STRING: {
-          const auto& strings = static_cast<const arrow::StringArray&>(*column);
-          *rc =
-              sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
-                                  strings.value_length(row), SQLITE_STATIC, SQLITE_UTF8);
-          break;
-        }
-        default:
-          SetError(error, "Binding parameter of type ", *column->type());
-          return ADBC_STATUS_NOT_IMPLEMENTED;
-      }
-    }
-    if (*rc != SQLITE_OK) {
-      SetError(error, "Failed to bind parameters");
-      return ADBC_STATUS_IO;
-    }
-    col_index++;
-  }
-  return ADBC_STATUS_OK;
-}
-
-class SqliteStatementReader : public arrow::RecordBatchReader {
- public:
-  explicit SqliteStatementReader(
-      std::shared_ptr<SqliteConnectionImpl> connection, sqlite3_stmt* stmt,
-      std::shared_ptr<arrow::RecordBatchReader> bind_parameters)
-      : connection_(std::move(connection)),
-        stmt_(stmt),
-        bind_parameters_(std::move(bind_parameters)),
-        schema_(nullptr),
-        next_parameters_(nullptr),
-        bind_index_(0),
-        done_(false),
-        rc_(SQLITE_OK) {}
-
-  AdbcStatusCode Init(struct AdbcError* error) {
-    // TODO: this crashes if the statement is closed while the reader
-    // is still open.
-    // Step the statement and get the schema (SQLite doesn't
-    // necessarily know the schema until it begins to execute it)
-
-    sqlite3* db = connection_->db();
-    Status status;
-    if (bind_parameters_) {
-      status = bind_parameters_->ReadNext(&next_parameters_);
-      ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
-      ADBC_RETURN_NOT_OK(BindNext(&rc_, error));
-      ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc_, "sqlite3_bind", error));
-    }
-    const int num_params = sqlite3_bind_parameter_count(stmt_);
-    if (num_params > 0) {
-      if (!bind_parameters_) {
-        SetError(error, "Statement has parameters, but no parameters given");
-        return ADBC_STATUS_INVALID_STATE;
-      } else if (num_params != bind_parameters_->schema()->num_fields()) {
-        SetError(error, "Statement has ", num_params, " parameters, but ",
-                 bind_parameters_->schema()->num_fields(), " parameters given");
-        return ADBC_STATUS_INVALID_STATE;
-      }
-    } else if (bind_parameters_ && bind_parameters_->schema()->num_fields() > 0) {
-      SetError(error, "Statement has no parameters, but ",
-               bind_parameters_->schema()->num_fields(), " parameters given");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-
-    // XXX: with parameters, inferring the schema from the first
-    // argument is inaccurate (what if one is null?). Is there a way
-    // to hint to SQLite the real type?
-
-    rc_ = sqlite3_step(stmt_);
-    if (rc_ == SQLITE_ERROR) {
-      return CheckRc(db, stmt_, rc_, "sqlite3_step", error);
-    }
-    schema_ = StatementToSchema(stmt_);
-    return ADBC_STATUS_OK;
-  }
-
-  void OverrideSchema(std::shared_ptr<arrow::Schema> schema) {
-    // TODO(ARROW-14705): use UnifySchemas for some sanity checking
-    schema_ = std::move(schema);
-  }
-
-  std::shared_ptr<arrow::Schema> schema() const override {
-    DCHECK(schema_);
-    return schema_;
-  }
-
-  Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override {
-    constexpr int64_t kBatchSize = 1024;
-    if (done_) {
-      *batch = nullptr;
-      return Status::OK();
-    }
-
-    std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders(schema_->num_fields());
-    for (int i = 0; static_cast<size_t>(i) < builders.size(); i++) {
-      // TODO: allow overriding memory pool
-      ARROW_RETURN_NOT_OK(arrow::MakeBuilder(arrow::default_memory_pool(),
-                                             schema_->field(i)->type(), &builders[i]));
-    }
-
-    sqlite3* db = connection_->db();
-
-    int64_t num_rows = 0;
-    for (int64_t row = 0; row < kBatchSize; row++) {
-      if (rc_ != SQLITE_DONE) {
-        for (int col = 0; col < schema_->num_fields(); col++) {
-          if (sqlite3_column_type(stmt_, col) == SQLITE_NULL) {
-            ARROW_RETURN_NOT_OK(builders[col]->AppendNull());
-          } else {
-            const auto& field = schema_->field(col);
-            switch (field->type()->id()) {
-              case arrow::Type::DOUBLE: {
-                const double value = sqlite3_column_double(stmt_, col);
-                ARROW_RETURN_NOT_OK(
-                    dynamic_cast<arrow::DoubleBuilder*>(builders[col].get())
-                        ->Append(value));
-                break;
-              }
-              case arrow::Type::INT64: {
-                const sqlite3_int64 value = sqlite3_column_int64(stmt_, col);
-                ARROW_RETURN_NOT_OK(
-                    dynamic_cast<arrow::Int64Builder*>(builders[col].get())
-                        ->Append(value));
-                break;
-              }
-              case arrow::Type::NA: {
-                ARROW_RETURN_NOT_OK(
-                    dynamic_cast<arrow::NullBuilder*>(builders[col].get())->AppendNull());
-                break;
-              }
-              case arrow::Type::STRING: {
-                const char* value =
-                    reinterpret_cast<const char*>(sqlite3_column_text(stmt_, col));
-                const arrow::util::string_view view(value, std::strlen(value));
-                ARROW_RETURN_NOT_OK(
-                    dynamic_cast<arrow::StringBuilder*>(builders[col].get())
-                        ->Append(value));
-                break;
-              }
-              default:
-                return Status::NotImplemented("[SQLite3] Cannot read field '",
-                                              field->name(), "' of type ",
-                                              field->type()->ToString());
-            }
-          }
-        }
-        num_rows++;
-      }
-
-      if (rc_ == SQLITE_ROW) {
-        rc_ = sqlite3_step(stmt_);
-      }
-      if (rc_ == SQLITE_ROW) {
-        continue;
-      } else if (rc_ == SQLITE_DONE) {
-        if (bind_parameters_ &&
-            (!next_parameters_ || bind_index_ >= next_parameters_->num_rows())) {
-          ARROW_RETURN_NOT_OK(bind_parameters_->ReadNext(&next_parameters_));
-          bind_index_ = 0;
-        }
-
-        if (next_parameters_ && bind_index_ < next_parameters_->num_rows()) {
-          rc_ = sqlite3_reset(stmt_);
-          if (rc_ != SQLITE_OK) {
-            return Status::IOError("[SQLite3] sqlite3_reset: ", sqlite3_errmsg(db));
-          }
-          struct AdbcError error;
-          ARROW_RETURN_NOT_OK(ToArrowStatus(BindNext(&rc_, &error), &error));
-          rc_ = sqlite3_step(stmt_);
-          if (rc_ != SQLITE_ERROR) continue;
-        }
-        done_ = true;
-        next_parameters_.reset();
-        break;
-      }
-      return Status::IOError("[SQLite3] sqlite3_step: ", sqlite3_errmsg(db));
-    }
-
-    if (done_ && num_rows == 0) {
-      *batch = nullptr;
-    } else {
-      arrow::ArrayVector arrays(builders.size());
-      for (size_t i = 0; i < builders.size(); i++) {
-        ARROW_RETURN_NOT_OK(builders[i]->Finish(&arrays[i]));
-      }
-      *batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays));
-    }
-    return Status::OK();
-  }
-
- private:
-  AdbcStatusCode BindNext(int* rc, struct AdbcError* error) {
-    if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
-      return ADBC_STATUS_OK;
-    }
-    return BindParameters(stmt_, *next_parameters_, bind_index_++, rc, error);
-  }
-
-  std::shared_ptr<SqliteConnectionImpl> connection_;
-  sqlite3_stmt* stmt_;
-  std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
-
-  std::shared_ptr<arrow::Schema> schema_;
-  std::shared_ptr<arrow::RecordBatch> next_parameters_;
-  int64_t bind_index_;
-  bool done_;
-  int rc_;
-};
-
-class SqliteStatementImpl {
- public:
-  explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
-      : connection_(std::move(connection)), append_(false), stmt_(nullptr) {}
-
-  AdbcStatusCode Close(struct AdbcError* error) {
-    if (stmt_) {
-      const int rc = sqlite3_finalize(stmt_);
-      stmt_ = nullptr;
-      ADBC_RETURN_NOT_OK(
-          CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
-    }
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
-                      struct ArrowArray* values, struct ArrowSchema* schema,
-                      struct AdbcError* error) {
-    std::shared_ptr<arrow::RecordBatch> batch;
-    auto status = arrow::ImportRecordBatch(values, schema).Value(&batch);
-    if (!status.ok()) {
-      SetError(error, status);
-      return ADBC_STATUS_INTERNAL;
-    }
-
-    std::shared_ptr<arrow::Table> table;
-    status = arrow::Table::FromRecordBatches({std::move(batch)}).Value(&table);
-    if (!status.ok()) {
-      SetError(error, status);
-      return ADBC_STATUS_INTERNAL;
-    }
-
-    bind_parameters_.reset(new arrow::TableBatchReader(std::move(table)));
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
-                      struct ArrowArrayStream* stream, struct AdbcError* error) {
-    auto status = arrow::ImportRecordBatchReader(stream).Value(&bind_parameters_);
-    if (!status.ok()) {
-      SetError(error, status);
-      return ADBC_STATUS_INTERNAL;
-    }
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode ExecuteQuery(const std::shared_ptr<SqliteStatementImpl>& self,
-                              struct ArrowArrayStream* out, int64_t* rows_affected,
-                              struct AdbcError* error) {
-    if (!stmt_ && bulk_table_.empty()) {
-      SetError(error, "Cannot execute a statement without a query");
-      return ADBC_STATUS_INVALID_STATE;
-    } else if (!bulk_table_.empty()) {
-      if (out) {
-        SetError(error, "Bulk ingestion does not generate a result set");
-        return ADBC_STATUS_INVALID_ARGUMENT;
-      }
-      return ExecuteBulk(rows_affected, error);
-    }
-
-    sqlite3* db = connection_->db();
-    int rc = SQLITE_OK;
-    if (stmt_) {
-      sqlite3_clear_bindings(stmt_);
-      ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
-    }
-
-    rc = sqlite3_reset(stmt_);
-    ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
-    auto reader = std::make_shared<SqliteStatementReader>(connection_, stmt_,
-                                                          std::move(bind_parameters_));
-    ADBC_RETURN_NOT_OK(reader->Init(error));
-
-    if (out) {
-      Status status = arrow::ExportRecordBatchReader(std::move(reader), out);
-      ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
-    } else {
-      // XXX: the logic for the reader is heavily intertwined into everything
-      // For now, just initialize and step it for the side effect
-
-      while (true) {
-        std::shared_ptr<arrow::RecordBatch> batch;
-        ADBC_RETURN_NOT_OK(FromArrowStatus(reader->ReadNext(&batch), error));
-        if (!batch) break;
-      }
-      ADBC_RETURN_NOT_OK(FromArrowStatus(reader->Close(), error));
-    }
-
-    if (rows_affected) *rows_affected = -1;
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode Prepare(const std::shared_ptr<SqliteStatementImpl>& self,
-                         struct AdbcError* error) {
-    if (stmt_) {
-      // No-op
-      return ADBC_STATUS_OK;
-    } else if (!bulk_table_.empty()) {
-      SetError(error, "Cannot prepare with bulk insert");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-    SetError(error, "Cannot prepare a statement without a query");
-    return ADBC_STATUS_INVALID_STATE;
-  }
-
-  AdbcStatusCode GetParameterSchema(const std::shared_ptr<SqliteStatementImpl>& self,
-                                    struct ArrowSchema* schema, struct AdbcError* error) {
-    if (!stmt_) {
-      SetError(error, "Cannot get parameter schema before preparing");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-
-    const int num_params = sqlite3_bind_parameter_count(stmt_);
-    arrow::FieldVector fields(num_params);
-    for (int i = 0; i < num_params; i++) {
-      const char* name = sqlite3_bind_parameter_name(stmt_, i);
-      fields[i] = arrow::field(name ? name : "", arrow::null());
-    }
-    ADBC_RETURN_NOT_OK(FromArrowStatus(
-        arrow::ExportSchema(arrow::Schema(std::move(fields)), schema), error));
-    return ADBC_STATUS_OK;
-  }
-
-  AdbcStatusCode SetOption(const std::shared_ptr<SqliteStatementImpl>& self,
-                           const char* key, const char* value, struct AdbcError* error) {
-    if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
-      // Bulk ingest
-
-      // Clear previous statement, if any
-      ADBC_RETURN_NOT_OK(Close(error));
-
-      if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
-      bulk_table_ = value;
-      return ADBC_STATUS_OK;
-    } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
-      if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
-        append_ = true;
-      } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
-        append_ = false;
-      } else {
-        SetError(error, "Unknown value '", value, "' for option: ", key);
-        return ADBC_STATUS_NOT_IMPLEMENTED;
-      }
-      return ADBC_STATUS_OK;
-    }
-    SetError(error, "Unknown option: ", key);
-    return ADBC_STATUS_NOT_IMPLEMENTED;
-  }
-
-  AdbcStatusCode SetSqlQuery(const std::shared_ptr<SqliteStatementImpl>& self,
-                             const char* query, struct AdbcError* error) {
-    bulk_table_.clear();
-    // Clear previous statement, if any
-    ADBC_RETURN_NOT_OK(Close(error));
-
-    sqlite3* db = connection_->db();
-    int rc = sqlite3_prepare_v2(db, query, static_cast<int>(std::strlen(query)), &stmt_,
-                                /*pzTail=*/nullptr);
-    return CheckRc(connection_->db(), stmt_, rc, "sqlite3_prepare_v2", error);
-  }
-
- private:
-  AdbcStatusCode ExecuteBulk(int64_t* rows_affected, struct AdbcError* error) {
-    if (!bind_parameters_) {
-      SetError(error, "Must AdbcStatementBind for bulk insertion");
-      return ADBC_STATUS_INVALID_STATE;
-    }
-
-    sqlite3* db = connection_->db();
-
-    // Create the table
-    if (!append_) {
-      // XXX: not injection-safe
-      std::string query = "CREATE TABLE ";
-      query += bulk_table_;
-      query += " (";
-      const auto& fields = bind_parameters_->schema()->fields();
-      for (int i = 0; i < fields.size(); i++) {
-        if (i > 0) query += ',';
-        query += fields[i]->name();
-      }
-      query += ')';
-
-      if (DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
-            const int rc = sqlite3_step(stmt);
-            if (rc == SQLITE_DONE) return ADBC_STATUS_OK;
-            return CheckRc(db, stmt, rc, "sqlite3_step", error);
-          }) != ADBC_STATUS_OK) {
-        return ADBC_STATUS_ALREADY_EXISTS;
-      }
-    }
-
-    // Insert the rows
-    {
-      std::string query = "INSERT INTO ";
-      query += bulk_table_;
-      query += " VALUES (";
-      const auto& fields = bind_parameters_->schema()->fields();
-      for (int i = 0; i < fields.size(); i++) {
-        if (i > 0) query += ',';
-        query += '?';
-      }
-      query += ')';
-
-      sqlite3_stmt* stmt;
-      int rc = sqlite3_prepare_v2(db, query.c_str(), static_cast<int>(query.size()),
-                                  &stmt, /*pzTail=*/nullptr);
-      if (rc != SQLITE_OK) {
-        // XXX: not a great way to try to figure out the right error
-        AdbcStatusCode code = ADBC_STATUS_ALREADY_EXISTS;
-        if (std::strstr(sqlite3_errmsg(db), "no such table:")) {
-          code = ADBC_STATUS_NOT_FOUND;
-        }
-        // Clean up
-        std::ignore = CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
-        return code;
-      }
-
-      ADBC_RETURN_NOT_OK(DoQuery(db, stmt, error, [&]() -> AdbcStatusCode {
-        int rc = SQLITE_OK;
-        int64_t rows = 0;
-        while (true) {
-          std::shared_ptr<arrow::RecordBatch> batch;
-          ADBC_RETURN_NOT_OK(
-              FromArrowStatus(bind_parameters_->Next().Value(&batch), error));
-          if (!batch) break;
-
-          for (int64_t row = 0; row < batch->num_rows(); row++) {
-            ADBC_RETURN_NOT_OK(BindParameters(stmt, *batch, row, &rc, error));
-            ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_bind", error));
-
-            rc = sqlite3_step(stmt);
-            if (rc != SQLITE_DONE) {
-              return CheckRc(db, stmt, rc, "sqlite3_step", error);
-            }
-
-            rc = sqlite3_reset(stmt);
-            ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_reset", error));
-
-            rc = sqlite3_clear_bindings(stmt);
-            ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_clear_bindings", error));
-          }
-          rows += batch->num_rows();
-        }
-        if (rows_affected) *rows_affected = rows;
-        return ADBC_STATUS_OK;
-      }));
-    }
-
-    bind_parameters_.reset();
-    return ADBC_STATUS_OK;
-  }
-
-  std::shared_ptr<SqliteConnectionImpl> connection_;
-
-  // Query state
-
-  // Bulk ingestion
-  // Target of bulk ingestion (rather janky to store state like this, though…)
-  std::string bulk_table_;
-  bool append_;
-
-  // Prepared statements
-  sqlite3_stmt* stmt_;
-  std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
-};
-
-// ADBC interface implementation - as private functions so that these
-// don't get replaced by the dynamic linker. If we implemented these
-// under the Adbc* names, then DriverInit, the linker may resolve
-// functions to the address of the functions provided by the driver
-// manager instead of our functions.
-//
-// We could also:
-// - Play games with RTLD_DEEPBIND - but this doesn't work with ASan
-// - Use __attribute__((visibility("protected"))) - but this is
-//   apparently poorly supported by some linkers
-// - Play with -Bsymbolic(-functions) - but this has other
-//   consequences and complicates the build setup
-//
-// So in the end some manual effort here was chosen.
-
-AdbcStatusCode SqliteDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
-  auto impl = std::make_shared<SqliteDatabaseImpl>();
-  database->private_data = new std::shared_ptr<SqliteDatabaseImpl>(impl);
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
-                                  struct AdbcError* error) {
-  if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
-  return (*ptr)->Init(error);
-}
-
-AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char* key,
-                                       const char* value, struct AdbcError* error) {
-  if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
-  return (*ptr)->SetOption(key, value, error);
-}
-
-AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
-                                     struct AdbcError* error) {
-  if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
-  AdbcStatusCode status = (*ptr)->Release(error);
-  delete ptr;
-  database->private_data = nullptr;
-  return status;
-}
-
-AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
-                                      struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->Commit(error);
-}
-
-AdbcStatusCode SqliteConnectionGetInfo(struct AdbcConnection* connection,
-                                       uint32_t* info_codes, size_t info_codes_length,
-                                       struct ArrowArrayStream* stream,
-                                       struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->GetInfo(info_codes, info_codes_length, stream, error);
-}
-
-AdbcStatusCode SqliteConnectionGetObjects(
-    struct AdbcConnection* connection, int depth, const char* catalog,
-    const char* db_schema, const char* table_name, const char** table_types,
-    const char* column_name, struct ArrowArrayStream* stream, struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->GetObjects(depth, catalog, db_schema, table_name, table_types,
-                            column_name, stream, error);
-}
-
-AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
-                                              const char* catalog, const char* db_schema,
-                                              const char* table_name,
-                                              struct ArrowSchema* schema,
-                                              struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
-}
-
-AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
-                                             struct ArrowArrayStream* stream,
-                                             struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->GetTableTypes(stream, error);
-}
-
-AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
-                                    struct AdbcDatabase* database,
-                                    struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->Init(database, error);
-}
-
-AdbcStatusCode SqliteConnectionNew(struct AdbcConnection* connection,
-                                   struct AdbcError* error) {
-  auto impl = std::make_shared<SqliteConnectionImpl>();
-  connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
-                                             const uint8_t* serialized_partition,
-                                             size_t serialized_length,
-                                             struct ArrowArrayStream* out,
-                                             struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
-                                       struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  AdbcStatusCode status = (*ptr)->Release(error);
-  delete ptr;
-  connection->private_data = nullptr;
-  return status;
-}
-
-AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
-                                        struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  return (*ptr)->Rollback(error);
-}
-
-AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
-                                         const char* key, const char* value,
-                                         struct AdbcError* error) {
-  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-
-  if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
-    bool autocommit = false;
-    if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
-      autocommit = false;
-    } else if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
-      autocommit = true;
-    } else {
-      SetError(error, "Invalid option value for autocommit: ", value);
-      return ADBC_STATUS_INVALID_ARGUMENT;
-    }
-    return (*ptr)->SetAutocommit(autocommit, error);
-  } else {
-    SetError(error, "Unknown option");
-  }
-  return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
-                                   struct ArrowArray* values, struct ArrowSchema* schema,
-                                   struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->Bind(*ptr, values, schema, error);
-}
-
-AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
-                                         struct ArrowArrayStream* stream,
-                                         struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->Bind(*ptr, stream, error);
-}
-
-AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
-                                                struct ArrowSchema* schema,
-                                                struct AdbcPartitions* partitions,
-                                                int64_t* rows_affected,
-                                                struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
-                                           struct ArrowArrayStream* out,
-                                           int64_t* rows_affected,
-                                           struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->ExecuteQuery(*ptr, out, rows_affected, error);
-}
-
-AdbcStatusCode SqliteStatementGetPartitionDesc(struct AdbcStatement* statement,
-                                               size_t index, uint8_t* partition_desc,
-                                               struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementGetPartitionDescSize(struct AdbcStatement* statement,
-                                                   size_t index, size_t* length,
-                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
-                                                 struct ArrowSchema* schema,
-                                                 struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->GetParameterSchema(*ptr, schema, error);
-}
-
-AdbcStatusCode SqliteStatementNew(struct AdbcConnection* connection,
-                                  struct AdbcStatement* statement,
-                                  struct AdbcError* error) {
-  auto conn_ptr =
-      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-  auto impl = std::make_shared<SqliteStatementImpl>(*conn_ptr);
-  statement->private_data = new std::shared_ptr<SqliteStatementImpl>(impl);
-  return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteStatementPrepare(struct AdbcStatement* statement,
-                                      struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->Prepare(*ptr, error);
-}
-
-AdbcStatusCode SqliteStatementRelease(struct AdbcStatement* statement,
-                                      struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  auto status = (*ptr)->Close(error);
-  delete ptr;
-  statement->private_data = nullptr;
-  return status;
-}
-
-AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const char* key,
-                                        const char* value, struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->SetOption(*ptr, key, value, error);
-}
-
-AdbcStatusCode SqliteStatementSetSqlQuery(struct AdbcStatement* statement,
-                                          const char* query, struct AdbcError* error) {
-  if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
-  auto* ptr =
-      reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
-  return (*ptr)->SetSqlQuery(*ptr, query, error);
-}
-
-}  // namespace
-
-AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
-  return SqliteDatabaseInit(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
-  return SqliteDatabaseNew(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
-                                     const char* value, struct AdbcError* error) {
-  return SqliteDatabaseSetOption(database, key, value, error);
-}
-
-AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
-                                   struct AdbcError* error) {
-  return SqliteDatabaseRelease(database, error);
-}
-
-AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
-                                    struct AdbcError* error) {
-  return SqliteConnectionCommit(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
-                                     uint32_t* info_codes, size_t info_codes_length,
-                                     struct ArrowArrayStream* stream,
-                                     struct AdbcError* error) {
-  return SqliteConnectionGetInfo(connection, info_codes, info_codes_length, stream,
-                                 error);
-}
-
-AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
-                                        const char* catalog, const char* db_schema,
-                                        const char* table_name, const char** table_types,
-                                        const char* column_name,
-                                        struct ArrowArrayStream* stream,
-                                        struct AdbcError* error) {
-  return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
-                                    table_types, column_name, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
-                                            const char* catalog, const char* db_schema,
-                                            const char* table_name,
-                                            struct ArrowSchema* schema,
-                                            struct AdbcError* error) {
-  return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
-                                        schema, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
-                                           struct ArrowArrayStream* stream,
-                                           struct AdbcError* error) {
-  return SqliteConnectionGetTableTypes(connection, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
-                                  struct AdbcDatabase* database,
-                                  struct AdbcError* error) {
-  return SqliteConnectionInit(connection, database, error);
-}
-
-AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
-                                 struct AdbcError* error) {
-  return SqliteConnectionNew(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
-                                           const uint8_t* serialized_partition,
-                                           size_t serialized_length,
-                                           struct ArrowArrayStream* out,
-                                           struct AdbcError* error) {
-  return SqliteConnectionReadPartition(connection, serialized_partition,
-                                       serialized_length, out, error);
-}
-
-AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
-                                     struct AdbcError* error) {
-  return SqliteConnectionRelease(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
-                                      struct AdbcError* error) {
-  return SqliteConnectionRollback(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
-                                       const char* value, struct AdbcError* error) {
-  return SqliteConnectionSetOption(connection, key, value, error);
-}
-
-AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
-                                 struct ArrowArray* values, struct ArrowSchema* schema,
-                                 struct AdbcError* error) {
-  return SqliteStatementBind(statement, values, schema, error);
-}
-
-AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
-                                       struct ArrowArrayStream* stream,
-                                       struct AdbcError* error) {
-  return SqliteStatementBindStream(statement, stream, error);
-}
-
-AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
-                                              ArrowSchema* schema,
-                                              struct AdbcPartitions* partitions,
-                                              int64_t* rows_affected,
-                                              struct AdbcError* error) {
-  return SqliteStatementExecutePartitions(statement, schema, partitions, rows_affected,
-                                          error);
-}
-
-AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
-                                         struct ArrowArrayStream* out,
-                                         int64_t* rows_affected,
-                                         struct AdbcError* error) {
-  return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
-                                             size_t index, uint8_t* partition_desc,
-                                             struct AdbcError* error) {
-  return SqliteStatementGetPartitionDesc(statement, index, partition_desc, error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
-                                                 size_t index, size_t* length,
-                                                 struct AdbcError* error) {
-  return SqliteStatementGetPartitionDescSize(statement, index, length, error);
-}
-
-AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
-                                               struct ArrowSchema* schema,
-                                               struct AdbcError* error) {
-  return SqliteStatementGetParameterSchema(statement, schema, error);
-}
-
-AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
-                                struct AdbcStatement* statement,
-                                struct AdbcError* error) {
-  return SqliteStatementNew(connection, statement, error);
-}
-
-AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
-                                    struct AdbcError* error) {
-  return SqliteStatementPrepare(statement, error);
-}
-
-AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
-                                    struct AdbcError* error) {
-  return SqliteStatementRelease(statement, error);
-}
-
-AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
-                                      const char* value, struct AdbcError* error) {
-  return SqliteStatementSetOption(statement, key, value, error);
-}
-
-AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
-                                        const char* query, struct AdbcError* error) {
-  return SqliteStatementSetSqlQuery(statement, query, error);
-}
-
-extern "C" {
-ADBC_EXPORT
-AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
-  if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
-
-  auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
-  std::memset(driver, 0, sizeof(*driver));
-  driver->DatabaseInit = SqliteDatabaseInit;
-  driver->DatabaseNew = SqliteDatabaseNew;
-  driver->DatabaseRelease = SqliteDatabaseRelease;
-  driver->DatabaseSetOption = SqliteDatabaseSetOption;
-
-  driver->ConnectionCommit = SqliteConnectionCommit;
-  driver->ConnectionGetInfo = SqliteConnectionGetInfo;
-  driver->ConnectionGetObjects = SqliteConnectionGetObjects;
-  driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
-  driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
-  driver->ConnectionInit = SqliteConnectionInit;
-  driver->ConnectionNew = SqliteConnectionNew;
-  driver->ConnectionReadPartition = SqliteConnectionReadPartition;
-  driver->ConnectionRelease = SqliteConnectionRelease;
-  driver->ConnectionRollback = SqliteConnectionRollback;
-  driver->ConnectionSetOption = SqliteConnectionSetOption;
-
-  driver->StatementBind = SqliteStatementBind;
-  driver->StatementBindStream = SqliteStatementBindStream;
-  driver->StatementExecutePartitions = SqliteStatementExecutePartitions;
-  driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
-  driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
-  driver->StatementNew = SqliteStatementNew;
-  driver->StatementPrepare = SqliteStatementPrepare;
-  driver->StatementRelease = SqliteStatementRelease;
-  driver->StatementSetOption = SqliteStatementSetOption;
-  driver->StatementSetSqlQuery = SqliteStatementSetSqlQuery;
-  return ADBC_STATUS_OK;
-}
-}
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index ce97e6b..c211170 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -15,225 +15,24 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cstring>
+#include <limits>
+#include <optional>
 #include <string>
-#include <vector>
+#include <string_view>
 
-#include <gmock/gmock.h>
+#include <adbc.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-matchers.h>
+#include <gtest/gtest-param-test.h>
 #include <gtest/gtest.h>
+#include <nanoarrow.h>
 
-#include <arrow/c/bridge.h>
-#include <arrow/record_batch.h>
-#include <arrow/table.h>
-#include <arrow/testing/matchers.h>
-
-#include "adbc.h"
-#include "driver/test_util.h"
+#include "statement_reader.h"
 #include "validation/adbc_validation.h"
+#include "validation/adbc_validation_util.h"
 
-// Tests of the SQLite example driver
-
-namespace adbc {
-
-using arrow::PointeesEqual;
-
-using RecordBatchMatcher =
-    decltype(::testing::UnorderedPointwise(PointeesEqual(), arrow::RecordBatchVector{}));
-
-RecordBatchMatcher BatchesAre(const std::shared_ptr<arrow::Schema>& schema,
-                              const std::vector<std::string>& batch_json) {
-  arrow::RecordBatchVector batches;
-  for (const std::string& json : batch_json) {
-    batches.push_back(adbc::RecordBatchFromJSON(schema, json));
-  }
-  return ::testing::UnorderedPointwise(PointeesEqual(), std::move(batches));
-}
-
-class Sqlite : public ::testing::Test {
- public:
-  void SetUp() override {
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcDatabaseSetOption(&database, "filename", ":memory:", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
-    ASSERT_NE(database.private_data, nullptr);
-
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &database, &error));
-    ASSERT_NE(connection.private_data, nullptr);
-  }
-
-  void TearDown() override {
-    if (error.message) {
-      error.release(&error);
-    }
-
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
-    ASSERT_EQ(connection.private_data, nullptr);
-
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
-    ASSERT_EQ(database.private_data, nullptr);
-  }
-
- protected:
-  void IngestSampleTable(struct AdbcConnection* connection) {
-    ArrowArray export_table;
-    ArrowSchema export_schema;
-    auto bulk_table =
-        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
-    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
-    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
-    AdbcStatement statement;
-    std::memset(&statement, 0, sizeof(statement));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(connection, &statement, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
-                                      "bulk_insert", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
-    int64_t rows_affected = 0;
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error));
-    ASSERT_EQ(bulk_table->num_rows(), rows_affected);
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
-  }
-
-  AdbcDatabase database;
-  AdbcConnection connection;
-  AdbcError error = {};
-
-  std::shared_ptr<arrow::Schema> bulk_schema = arrow::schema(
-      {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
-
-  std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
-      arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
-      arrow::field("ordinal_position", arrow::int32()),
-      arrow::field("remarks", arrow::utf8()),
-      arrow::field("xdbc_data_type", arrow::int16()),
-      arrow::field("xdbc_type_name", arrow::utf8()),
-      arrow::field("xdbc_column_size", arrow::int32()),
-      arrow::field("xdbc_decimal_digits", arrow::int16()),
-      arrow::field("xdbc_num_prec_radix", arrow::int16()),
-      arrow::field("xdbc_nullable", arrow::int16()),
-      arrow::field("xdbc_column_def", arrow::utf8()),
-      arrow::field("xdbc_sql_data_type", arrow::int16()),
-      arrow::field("xdbc_datetime_sub", arrow::int16()),
-      arrow::field("xdbc_char_octet_length", arrow::int32()),
-      arrow::field("xdbc_is_nullable", arrow::utf8()),
-      arrow::field("xdbc_scope_catalog", arrow::utf8()),
-      arrow::field("xdbc_scope_schema", arrow::utf8()),
-      arrow::field("xdbc_scope_table", arrow::utf8()),
-      arrow::field("xdbc_is_autoincrement", arrow::boolean()),
-      arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
-  });
-  std::shared_ptr<arrow::DataType> usage_schema = arrow::struct_({
-      arrow::field("fk_catalog", arrow::utf8()),
-      arrow::field("fk_db_schema", arrow::utf8()),
-      arrow::field("fk_table", arrow::utf8()),
-      arrow::field("fk_column_name", arrow::utf8()),
-  });
-  std::shared_ptr<arrow::DataType> constraint_schema = arrow::struct_({
-      arrow::field("constraint_name", arrow::utf8()),
-      arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
-      arrow::field("constraint_column_names", arrow::list(arrow::utf8()),
-                   /*nullable=*/false),
-      arrow::field("constraint_column_usage", arrow::list(usage_schema)),
-  });
-  std::shared_ptr<arrow::DataType> table_schema = arrow::struct_({
-      arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
-      arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
-      arrow::field("table_columns", arrow::list(column_schema)),
-      arrow::field("table_constraints", arrow::list(constraint_schema)),
-  });
-  std::shared_ptr<arrow::DataType> db_schema_schema = arrow::struct_({
-      arrow::field("db_schema_name", arrow::utf8()),
-      arrow::field("db_schema_tables", arrow::list(table_schema)),
-  });
-  std::shared_ptr<arrow::Schema> catalog_schema = arrow::schema({
-      arrow::field("catalog_name", arrow::utf8()),
-      arrow::field("catalog_db_schemas", arrow::list(db_schema_schema)),
-  });
-};
-
-TEST_F(Sqlite, MetadataGetObjectsColumns) {
-  {
-    AdbcStatement statement;
-    std::memset(&statement, 0, sizeof(statement));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
-
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error,
-        AdbcStatementSetSqlQuery(
-            &statement, "CREATE TABLE parent (a, b, c, PRIMARY KEY(c, b))", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementSetSqlQuery(&statement, "CREATE TABLE other (a)", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementSetSqlQuery(
-                   &statement,
-                   "CREATE TABLE child (a, b, c, PRIMARY KEY(a), FOREIGN KEY (c, b) "
-                   "REFERENCES parent (c, b), FOREIGN KEY (a) REFERENCES other(a))",
-                   &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
-  }
-
-  struct ArrowArrayStream stream;
-  std::shared_ptr<arrow::Schema> schema;
-  arrow::RecordBatchVector batches;
-
-  ADBC_ASSERT_OK_WITH_ERROR(
-      error,
-      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
-                               nullptr, nullptr, nullptr, &stream, &error));
-  ASSERT_NO_FATAL_FAILURE(ReadStream(&stream, &schema, &batches));
-  EXPECT_THAT(batches,
-              BatchesAre(catalog_schema,
-                         {R"([["main", [{"db_schema_name": null, "db_schema_tables": [
-  {
-    "table_name": "child",
-    "table_type": "table",
-    "table_columns": [
-      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
-      ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
-      ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
-    ],
-    "table_constraints": [
-      [null, "PRIMARY KEY", ["a"], []],
-      [null, "FOREIGN KEY", ["a"], [[null, null, "other", "a"]]],
-      [null, "FOREIGN KEY", ["c", "b"], [[null, null, "parent", "c"], [null, null, "parent", "b"]]]
-    ]
-  },
-  {
-    "table_name": "other",
-    "table_type": "table",
-    "table_columns": [
-      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
-    ],
-    "table_constraints": []
-  },
-  {
-    "table_name": "parent",
-    "table_type": "table",
-    "table_columns": [
-      ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
-      ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
-      ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
-    ],
-    "table_constraints": [
-      [null, "PRIMARY KEY", ["c", "b"], []]
-    ]
-  }
-]}]]])"}));
-  batches.clear();
-}
+// -- ADBC Test Suite ------------------------------------------------
 
 class SqliteQuirks : public adbc_validation::DriverQuirks {
  public:
@@ -241,7 +40,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
                                struct AdbcError* error) const override {
     // Shared DB required for transaction tests
     return AdbcDatabaseSetOption(
-        database, "filename", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
+        database, "uri", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
   }
 
   std::string BindParameter(int index) const override { return "?"; }
@@ -284,4 +83,430 @@ class SqliteStatementTest : public ::testing::Test,
 };
 ADBCV_TEST_STATEMENT(SqliteStatementTest)
 
-}  // namespace adbc
+// -- SQLite Specific Tests ------------------------------------------
+
+constexpr size_t kInferRows = 16;
+
+using adbc_validation::CompareArray;
+using adbc_validation::Handle;
+using adbc_validation::IsOkErrno;
+using adbc_validation::IsOkStatus;
+
+/// Specific tests of the type-inferring reader
+class SqliteReaderTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    std::memset(&error, 0, sizeof(error));
+    std::memset(&binder, 0, sizeof(binder));
+    ASSERT_EQ(SQLITE_OK, sqlite3_open_v2(
+                             ":memory:", &db,
+                             SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+                             /*zVfs=*/nullptr));
+  }
+  void TearDown() override {
+    if (error.release) error.release(&error);
+    AdbcSqliteBinderRelease(&binder);
+    sqlite3_finalize(stmt);
+    sqlite3_close(db);
+  }
+
+  void Exec(const std::string& query) {
+    ASSERT_EQ(SQLITE_OK, sqlite3_prepare_v2(db, query.c_str(), query.size(), &stmt,
+                                            /*pzTail=*/nullptr));
+    ASSERT_EQ(SQLITE_DONE, sqlite3_step(stmt));
+    sqlite3_finalize(stmt);
+    stmt = nullptr;
+  }
+
+  void Bind(struct ArrowArray* batch, struct ArrowSchema* schema) {
+    ASSERT_THAT(AdbcSqliteBinderSetArray(&binder, batch, schema, &error),
+                IsOkStatus(&error));
+  }
+
+  void Bind(struct ArrowArrayStream* stream) {
+    ASSERT_THAT(AdbcSqliteBinderSetArrayStream(&binder, stream, &error),
+                IsOkStatus(&error));
+  }
+
+  void ExecSelect(const std::string& values, size_t infer_rows,
+                  adbc_validation::StreamReader* reader) {
+    ASSERT_NO_FATAL_FAILURE(Exec("CREATE TABLE foo (col)"));
+    ASSERT_NO_FATAL_FAILURE(Exec("INSERT INTO foo VALUES " + values));
+    const std::string query = "SELECT * FROM foo";
+    ASSERT_NO_FATAL_FAILURE(Exec(query, infer_rows, reader));
+    ASSERT_EQ(1, reader->schema->n_children);
+  }
+
+  void Exec(const std::string& query, size_t infer_rows,
+            adbc_validation::StreamReader* reader) {
+    ASSERT_EQ(SQLITE_OK, sqlite3_prepare_v2(db, query.c_str(), query.size(), &stmt,
+                                            /*pzTail=*/nullptr));
+    struct AdbcSqliteBinder* binder =
+        this->binder.schema.release ? &this->binder : nullptr;
+    ASSERT_THAT(AdbcSqliteExportReader(db, stmt, binder, infer_rows,
+                                       &reader->stream.value, &error),
+                IsOkStatus(&error));
+    ASSERT_NO_FATAL_FAILURE(reader->GetSchema());
+  }
+
+ protected:
+  sqlite3* db = nullptr;
+  sqlite3_stmt* stmt = nullptr;
+  struct AdbcError error;
+  struct AdbcSqliteBinder binder;
+};
+
+TEST_F(SqliteReaderTest, IntsNulls) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(ExecSelect("(NULL), (1), (NULL), (-1)", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0],
+                                                {std::nullopt, 1, std::nullopt, -1}));
+}
+
+TEST_F(SqliteReaderTest, FloatsNulls) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect("(NULL), (1.0), (NULL), (-1.0), (0.0)", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+      reader.array_view->children[0], {std::nullopt, 1.0, std::nullopt, -1.0, 0.0}));
+}
+
+TEST_F(SqliteReaderTest, IntsFloatsNulls) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect("(NULL), (1), (NULL), (-1.0), (0)", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+      reader.array_view->children[0], {std::nullopt, 1.0, std::nullopt, -1.0, 0.0}));
+}
+
+TEST_F(SqliteReaderTest, IntsNullsStrsNullsInts) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(ExecSelect(
+      R"((NULL), (1), (NULL), (-1), ("foo"), (NULL), (""), (24))", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(
+      reader.array_view->children[0],
+      {std::nullopt, "1", std::nullopt, "-1", "foo", std::nullopt, "", "24"}));
+}
+
+TEST_F(SqliteReaderTest, IntExtremes) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((NULL), (9223372036854775807), (NULL), (-9223372036854775808))",
+                 kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0],
+                            {std::nullopt, std::numeric_limits<int64_t>::max(),
+                             std::nullopt, std::numeric_limits<int64_t>::min()}));
+}
+
+TEST_F(SqliteReaderTest, IntExtremesStrs) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(ExecSelect(
+      R"((NULL), (9223372036854775807), (-9223372036854775808), (""), (9223372036854775807), (-9223372036854775808))",
+      kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(reader.array_view->children[0],
+                                                    {
+                                                        std::nullopt,
+                                                        "9223372036854775807",
+                                                        "-9223372036854775808",
+                                                        "",
+                                                        "9223372036854775807",
+                                                        "-9223372036854775808",
+                                                    }));
+}
+
+TEST_F(SqliteReaderTest, FloatExtremes) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((NULL), (9e999), (NULL), (-9e999))", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+      reader.array_view->children[0], {
+                                          std::nullopt,
+                                          std::numeric_limits<double>::infinity(),
+                                          std::nullopt,
+                                          -std::numeric_limits<double>::infinity(),
+                                      }));
+}
+
+TEST_F(SqliteReaderTest, IntsFloatsStrs) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((1), (1.0), (""), (9e999), (-9e999))", kInferRows, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<std::string>(reader.array_view->children[0],
+                                {"1.000000e+00", "1.000000e+00", "", "inf", "-inf"}));
+}
+
+TEST_F(SqliteReaderTest, InferIntReadInt) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((1), (NULL), (2), (NULL))", /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {2, std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferIntRejectFloat) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((1), (NULL), (2E0), (NULL))", /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+
+  ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+  ASSERT_THAT(reader.stream->get_last_error(&reader.stream.value),
+              ::testing::HasSubstr(
+                  "[SQLite] Type mismatch in column 0: expected INT64 but got DOUBLE"));
+}
+
+TEST_F(SqliteReaderTest, InferIntRejectStr) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((1), (NULL), (""), (NULL))", /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+
+  ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+  ASSERT_THAT(
+      reader.stream->get_last_error(&reader.stream.value),
+      ::testing::HasSubstr(
+          "[SQLite] Type mismatch in column 0: expected INT64 but got STRING/BINARY"));
+}
+
+TEST_F(SqliteReaderTest, InferFloatReadIntFloat) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(
+      ExecSelect(R"((1E0), (NULL), (2E0), (3), (NULL))", /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<double>(reader.array_view->children[0], {1.0, std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<double>(reader.array_view->children[0], {2.0, 3.0}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<double>(reader.array_view->children[0], {std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferFloatRejectStr) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(ExecSelect(R"((1E0), (NULL), (2E0), (3), (""), (NULL))",
+                                     /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<double>(reader.array_view->children[0], {1.0, std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<double>(reader.array_view->children[0], {2.0, 3.0}));
+
+  ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+  ASSERT_THAT(
+      reader.stream->get_last_error(&reader.stream.value),
+      ::testing::HasSubstr(
+          "[SQLite] Type mismatch in column 0: expected DOUBLE but got STRING/BINARY"));
+}
+
+TEST_F(SqliteReaderTest, InferStrReadAll) {
+  adbc_validation::StreamReader reader;
+  ASSERT_NO_FATAL_FAILURE(ExecSelect(R"((""), (NULL), (2), (3E0), ("foo"), (NULL))",
+                                     /*infer_rows=*/2, &reader));
+  ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<std::string>(reader.array_view->children[0], {"", std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<std::string>(reader.array_view->children[0], {"2", "3.0"}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<std::string>(reader.array_view->children[0], {"foo", std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferOneParam) {
+  adbc_validation::StreamReader reader;
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> batch;
+
+  ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+              IsOkErrno());
+  ASSERT_THAT(
+      adbc_validation::MakeBatch<int64_t>(&schema.value, &batch.value, /*error=*/nullptr,
+                                          {std::nullopt, 2, 4, -1}),
+      IsOkErrno());
+
+  ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+  ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/2, &reader));
+
+  ASSERT_EQ(1, reader.schema->n_children);
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 2}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0], {4, -1}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferOneParamStream) {
+  adbc_validation::StreamReader reader;
+  Handle<struct ArrowArrayStream> stream;
+  Handle<struct ArrowSchema> schema;
+  std::vector<struct ArrowArray> batches(3);
+
+  ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+              IsOkErrno());
+  ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[0],
+                                                  /*error=*/nullptr, {std::nullopt, 1}),
+              IsOkErrno());
+  ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[1],
+                                                  /*error=*/nullptr, {2, 3}),
+              IsOkErrno());
+  ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[2],
+                                                  /*error=*/nullptr, {4, std::nullopt}),
+              IsOkErrno());
+  adbc_validation::MakeStream(&stream.value, &schema.value, std::move(batches));
+
+  ASSERT_NO_FATAL_FAILURE(Bind(&stream.value));
+  ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/3, &reader));
+
+  ASSERT_EQ(1, reader.schema->n_children);
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 1, 2}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {3, 4, std::nullopt}));
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferTypedParams) {
+  adbc_validation::StreamReader reader;
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> batch;
+
+  ASSERT_NO_FATAL_FAILURE(Exec("CREATE TABLE foo (idx, value)"));
+  ASSERT_NO_FATAL_FAILURE(
+      Exec(R"(INSERT INTO foo VALUES (0, "foo"), (1, NULL), (2, 4), (3, 1E2))"));
+
+  ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+              IsOkErrno());
+  ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batch.value,
+                                                  /*error=*/nullptr, {1, 2, 3, 0}),
+              IsOkErrno());
+
+  ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+  ASSERT_NO_FATAL_FAILURE(
+      Exec("SELECT value FROM foo WHERE idx = ?", /*infer_rows=*/2, &reader));
+  ASSERT_EQ(1, reader.schema->n_children);
+  ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+  ASSERT_NO_FATAL_FAILURE(reader.Next());
+  ASSERT_NO_FATAL_FAILURE(
+      CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 4}));
+  ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+  ASSERT_THAT(reader.stream->get_last_error(&reader.stream.value),
+              ::testing::HasSubstr(
+                  "[SQLite] Type mismatch in column 0: expected INT64 but got DOUBLE"));
+}
+
+template <typename CType>
+class SqliteNumericParamTest : public SqliteReaderTest,
+                               public ::testing::WithParamInterface<ArrowType> {
+ public:
+  void Test(ArrowType expected_type) {
+    adbc_validation::StreamReader reader;
+    Handle<struct ArrowSchema> schema;
+    Handle<struct ArrowArray> batch;
+
+    ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", GetParam()}}),
+                IsOkErrno());
+    ASSERT_THAT(adbc_validation::MakeBatch<CType>(&schema.value, &batch.value,
+                                                  /*error=*/nullptr,
+                                                  {std::nullopt, 0, 1, 2, 4, 8}),
+                IsOkErrno());
+
+    ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+    ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/2, &reader));
+
+    ASSERT_EQ(1, reader.schema->n_children);
+    ASSERT_EQ(expected_type, reader.fields[0].data_type);
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_NO_FATAL_FAILURE(
+        CompareArray<CType>(reader.array_view->children[0], {std::nullopt, 0}));
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_NO_FATAL_FAILURE(CompareArray<CType>(reader.array_view->children[0], {1, 2}));
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_NO_FATAL_FAILURE(CompareArray<CType>(reader.array_view->children[0], {4, 8}));
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(nullptr, reader.array->release);
+  }
+};
+
+class SqliteIntParamTest : public SqliteNumericParamTest<int64_t> {};
+
+TEST_P(SqliteIntParamTest, BindInt) {
+  ASSERT_NO_FATAL_FAILURE(Test(NANOARROW_TYPE_INT64));
+}
+
+INSTANTIATE_TEST_SUITE_P(IntTypes, SqliteIntParamTest,
+                         ::testing::Values(NANOARROW_TYPE_UINT8, NANOARROW_TYPE_UINT16,
+                                           NANOARROW_TYPE_UINT32, NANOARROW_TYPE_UINT64,
+                                           NANOARROW_TYPE_INT8, NANOARROW_TYPE_INT16,
+                                           NANOARROW_TYPE_INT32, NANOARROW_TYPE_INT64));
+
+class SqliteFloatParamTest : public SqliteNumericParamTest<double> {};
+
+TEST_P(SqliteFloatParamTest, BindFloat) {
+  ASSERT_NO_FATAL_FAILURE(Test(NANOARROW_TYPE_DOUBLE));
+}
+
+INSTANTIATE_TEST_SUITE_P(FloatTypes, SqliteFloatParamTest,
+                         ::testing::Values(
+                             // XXX: AppendDouble currently doesn't really work with
+                             // floats (FLT_MIN/FLT_MAX isn't the right thing)
+
+                             // NANOARROW_TYPE_FLOAT,
+                             NANOARROW_TYPE_DOUBLE));
diff --git a/c/driver/sqlite/statement_reader.c b/c/driver/sqlite/statement_reader.c
new file mode 100644
index 0000000..9f8813d
--- /dev/null
+++ b/c/driver/sqlite/statement_reader.c
@@ -0,0 +1,896 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "statement_reader.h"
+
+#include <inttypes.h>
+#include <math.h>
+#include <stdio.h>
+
+#include <adbc.h>
+#include <nanoarrow.h>
+#include <sqlite3.h>
+
+#include "utils.h"
+
+AdbcStatusCode AdbcSqliteBinderSet(struct AdbcSqliteBinder* binder,
+                                   struct AdbcError* error) {
+  int status = binder->params.get_schema(&binder->params, &binder->schema);
+  if (status != 0) {
+    const char* message = binder->params.get_last_error(&binder->params);
+    if (!message) message = "(unknown error)";
+    SetError(error, "Failed to get parameter schema: (%d) %s: %s", status,
+             strerror(status), message);
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  struct ArrowError arrow_error = {0};
+  status = ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
+  if (status != 0) {
+    SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+             strerror(status), arrow_error.message);
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  if (binder->batch.storage_type != NANOARROW_TYPE_STRUCT) {
+    SetError(error, "Bind parameters do not have root type STRUCT");
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  binder->types =
+      (enum ArrowType*)malloc(binder->schema.n_children * sizeof(enum ArrowType));
+
+  struct ArrowSchemaView view = {0};
+  for (int i = 0; i < binder->schema.n_children; i++) {
+    status = ArrowSchemaViewInit(&view, binder->schema.children[i], &arrow_error);
+    if (status != 0) {
+      SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
+               strerror(status), status, arrow_error.message);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    if (view.data_type == NANOARROW_TYPE_UNINITIALIZED) {
+      SetError(error, "Column %d has UNINITIALIZED type", i);
+      return ADBC_STATUS_INTERNAL;
+    }
+    binder->types[i] = view.data_type;
+  }
+
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode AdbcSqliteBinderSetArray(struct AdbcSqliteBinder* binder,
+                                        struct ArrowArray* values,
+                                        struct ArrowSchema* schema,
+                                        struct AdbcError* error) {
+  AdbcSqliteBinderRelease(binder);
+  RAISE_ADBC(BatchToArrayStream(values, schema, &binder->params, error));
+  return AdbcSqliteBinderSet(binder, error);
+}  // NOLINT(whitespace/indent)
+AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct AdbcSqliteBinder* binder,
+                                              struct ArrowArrayStream* values,
+                                              struct AdbcError* error) {
+  AdbcSqliteBinderRelease(binder);
+  binder->params = *values;
+  memset(values, 0, sizeof(*values));
+  return AdbcSqliteBinderSet(binder, error);
+}
+AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder, sqlite3* conn,
+                                        sqlite3_stmt* stmt, char* finished,
+                                        struct AdbcError* error) {
+  struct ArrowError arrow_error = {0};
+  int status = 0;
+  while (!binder->array.release || binder->next_row >= binder->array.length) {
+    if (binder->array.release) {
+      ArrowArrayViewReset(&binder->batch);
+      binder->array.release(&binder->array);
+
+      status =
+          ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
+      if (status != 0) {
+        SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+                 strerror(status), arrow_error.message);
+        return ADBC_STATUS_INTERNAL;
+      }
+    }
+
+    status = binder->params.get_next(&binder->params, &binder->array);
+    if (status != 0) {
+      const char* message = binder->params.get_last_error(&binder->params);
+      if (!message) message = "(unknown error)";
+      SetError(error, "Failed to get next parameter batch: (%d) %s: %s", status,
+               strerror(status), message);
+      return ADBC_STATUS_IO;
+    }
+
+    if (!binder->array.release) {
+      *finished = 1;
+      AdbcSqliteBinderRelease(binder);
+      return ADBC_STATUS_OK;
+    }
+
+    status = ArrowArrayViewSetArray(&binder->batch, &binder->array, &arrow_error);
+    if (status != 0) {
+      SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+               strerror(status), arrow_error.message);
+      return ADBC_STATUS_INTERNAL;
+    }
+
+    binder->next_row = 0;
+  }
+
+  if (sqlite3_reset(stmt) != SQLITE_OK) {
+    SetError(error, "Failed to reset statement: %s", sqlite3_errmsg(conn));
+    return ADBC_STATUS_INTERNAL;
+  }
+  if (sqlite3_clear_bindings(stmt) != SQLITE_OK) {
+    SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
+    return ADBC_STATUS_INTERNAL;
+  }
+
+  for (int col = 0; col < binder->schema.n_children; col++) {
+    if (ArrowArrayViewIsNull(binder->batch.children[col], binder->next_row)) {
+      status = sqlite3_bind_null(stmt, col + 1);
+    } else {
+      switch (binder->types[col]) {
+        case NANOARROW_TYPE_BINARY:
+        case NANOARROW_TYPE_LARGE_BINARY: {
+          struct ArrowBufferView value =
+              ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
+          status = sqlite3_bind_text(stmt, col + 1, value.data.as_char, value.n_bytes,
+                                     SQLITE_STATIC);
+          break;
+        }
+        case NANOARROW_TYPE_UINT8:
+        case NANOARROW_TYPE_UINT16:
+        case NANOARROW_TYPE_UINT32:
+        case NANOARROW_TYPE_UINT64: {
+          uint64_t value =
+              ArrowArrayViewGetUIntUnsafe(binder->batch.children[col], binder->next_row);
+          if (value > INT64_MAX) {
+            SetError(error,
+                     "Column %d has unsigned integer value %" PRIu64
+                     "out of range of int64_t",
+                     col, value);
+            return ADBC_STATUS_INVALID_ARGUMENT;
+          }
+          status = sqlite3_bind_int64(stmt, col + 1, (int64_t)value);
+          break;
+        }
+        case NANOARROW_TYPE_INT8:
+        case NANOARROW_TYPE_INT16:
+        case NANOARROW_TYPE_INT32:
+        case NANOARROW_TYPE_INT64: {
+          int64_t value =
+              ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
+          status = sqlite3_bind_int64(stmt, col + 1, value);
+          break;
+        }
+        case NANOARROW_TYPE_FLOAT:
+        case NANOARROW_TYPE_DOUBLE: {
+          int64_t value = ArrowArrayViewGetDoubleUnsafe(binder->batch.children[col],
+                                                        binder->next_row);
+          status = sqlite3_bind_double(stmt, col + 1, value);
+          break;
+        }
+        case NANOARROW_TYPE_STRING:
+        case NANOARROW_TYPE_LARGE_STRING: {
+          struct ArrowBufferView value =
+              ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
+          status = sqlite3_bind_text(stmt, col + 1, value.data.as_char, value.n_bytes,
+                                     SQLITE_STATIC);
+          break;
+        }
+        default:
+          SetError(error, "Column %d has unsupported type %s", col,
+                   ArrowTypeString(binder->types[col]));
+          return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+    }
+
+    if (status != SQLITE_OK) {
+      SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
+      return ADBC_STATUS_INTERNAL;
+    }
+  }
+
+  binder->next_row++;
+  *finished = 0;
+  return ADBC_STATUS_OK;
+}
+
+void AdbcSqliteBinderRelease(struct AdbcSqliteBinder* binder) {
+  if (binder->schema.release) {
+    binder->schema.release(&binder->schema);
+  }
+  if (binder->params.release) {
+    binder->params.release(&binder->params);
+  }
+  if (binder->types) {
+    free(binder->types);
+  }
+  if (binder->array.release) {
+    binder->array.release(&binder->array);
+  }
+  ArrowArrayViewReset(&binder->batch);
+  memset(binder, 0, sizeof(*binder));
+}
+
+struct StatementReader {
+  sqlite3* db;
+  sqlite3_stmt* stmt;
+  enum ArrowType* types;
+  struct ArrowSchema schema;
+  struct ArrowArray initial_batch;
+  struct AdbcSqliteBinder* binder;
+  struct ArrowError error;
+  char done;
+  int batch_size;
+};
+
+const char* StatementReaderGetLastError(struct ArrowArrayStream* self) {
+  if (!self->release || !self->private_data) {
+    return NULL;
+  }
+
+  struct StatementReader* reader = (struct StatementReader*)self->private_data;
+  return reader->error.message;
+}
+
+void StatementReaderSetError(struct StatementReader* reader) {
+  const char* msg = sqlite3_errmsg(reader->db);
+  strncpy(reader->error.message, msg, sizeof(reader->error.message));
+  reader->error.message[sizeof(reader->error.message) - 1] = '\0';
+}
+
+int StatementReaderGetOneValue(struct StatementReader* reader, int col,
+                               struct ArrowArray* out) {
+  int sqlite_type = sqlite3_column_type(reader->stmt, col);
+
+  if (sqlite_type == SQLITE_NULL) {
+    return ArrowArrayAppendNull(out, 1);
+  }
+
+  switch (reader->types[col]) {
+    case NANOARROW_TYPE_INT64: {
+      switch (sqlite_type) {
+        case SQLITE_INTEGER: {
+          int64_t value = sqlite3_column_int64(reader->stmt, col);
+          return ArrowArrayAppendInt(out, value);
+        }
+        case SQLITE_FLOAT: {
+          // TODO: behavior needs to be configurable
+          snprintf(reader->error.message, sizeof(reader->error.message),
+                   "[SQLite] Type mismatch in column %d: expected INT64 but got DOUBLE",
+                   col);
+          return EIO;
+        }
+        case SQLITE_TEXT:
+        case SQLITE_BLOB: {
+          snprintf(
+              reader->error.message, sizeof(reader->error.message),
+              "[SQLite] Type mismatch in column %d: expected INT64 but got STRING/BINARY",
+              col);
+          return EIO;
+        }
+        default: {
+          snprintf(reader->error.message, sizeof(reader->error.message),
+                   "[SQLite] Type mismatch in column %d: expected INT64 but got unknown "
+                   "type %d",
+                   col, sqlite_type);
+          return ENOTSUP;
+        }
+      }
+      break;
+    }
+
+    case NANOARROW_TYPE_DOUBLE: {
+      switch (sqlite_type) {
+        case SQLITE_INTEGER:
+        case SQLITE_FLOAT: {
+          // Let SQLite convert
+          double value = sqlite3_column_double(reader->stmt, col);
+          return ArrowArrayAppendDouble(out, value);
+        }
+        case SQLITE_TEXT:
+        case SQLITE_BLOB: {
+          snprintf(reader->error.message, sizeof(reader->error.message),
+                   "[SQLite] Type mismatch in column %d: expected DOUBLE but got "
+                   "STRING/BINARY",
+                   col);
+          return EIO;
+        }
+        default: {
+          snprintf(reader->error.message, sizeof(reader->error.message),
+                   "[SQLite] Type mismatch in column %d: expected DOUBLE but got unknown "
+                   "type %d",
+                   col, sqlite_type);
+          return ENOTSUP;
+        }
+      }
+      break;
+    }
+
+    case NANOARROW_TYPE_STRING: {
+      switch (sqlite_type) {
+        case SQLITE_INTEGER:
+        case SQLITE_FLOAT:
+        case SQLITE_TEXT:
+        case SQLITE_BLOB: {
+          // Let SQLite convert
+          struct ArrowStringView value = {
+              .data = (const char*)sqlite3_column_text(reader->stmt, col),
+              .n_bytes = sqlite3_column_bytes(reader->stmt, col),
+          };
+          return ArrowArrayAppendString(out, value);
+        }
+        default: {
+          snprintf(reader->error.message, sizeof(reader->error.message),
+                   "[SQLite] Type mismatch in column %d: expected STRING but got unknown "
+                   "type %d",
+                   col, sqlite_type);
+          return ENOTSUP;
+        }
+      }
+      break;
+    }
+
+    default: {
+      snprintf(reader->error.message, sizeof(reader->error.message),
+               "[SQLite] Internal error: unknown inferred column type %d",
+               reader->types[col]);
+      return ENOTSUP;
+    }
+  }
+
+  return 0;
+}
+
+int StatementReaderGetNext(struct ArrowArrayStream* self, struct ArrowArray* out) {
+  if (!self->release || !self->private_data) {
+    return EINVAL;
+  }
+
+  struct StatementReader* reader = (struct StatementReader*)self->private_data;
+  if (reader->initial_batch.release != NULL) {
+    memcpy(out, &reader->initial_batch, sizeof(*out));
+    memset(&reader->initial_batch, 0, sizeof(reader->initial_batch));
+    return 0;
+  } else if (reader->done) {
+    out->release = NULL;
+    return 0;
+  }
+
+  RAISE_NA(ArrowArrayInitFromSchema(out, &reader->schema, &reader->error));
+  RAISE_NA(ArrowArrayStartAppending(out));
+  int64_t batch_size = 0;
+  int status = 0;
+
+  sqlite3_mutex_enter(sqlite3_db_mutex(reader->db));
+  while (batch_size < reader->batch_size) {
+    if (reader->binder) {
+      char finished = 0;
+      struct AdbcError error = {0};
+      AdbcStatusCode status = AdbcSqliteBinderBindNext(reader->binder, reader->db,
+                                                       reader->stmt, &finished, &error);
+      if (status != ADBC_STATUS_OK) {
+        reader->done = 1;
+        status = EIO;
+        if (error.release) {
+          strncpy(reader->error.message, error.message, sizeof(reader->error.message));
+          reader->error.message[sizeof(reader->error.message) - 1] = '\0';
+          error.release(&error);
+        }
+        break;
+      } else if (finished) {
+        reader->done = 1;
+        break;
+      }
+    }
+
+    int rc = sqlite3_step(reader->stmt);
+    if (rc == SQLITE_DONE) {
+      reader->done = 1;
+      break;
+    } else if (rc == SQLITE_ERROR) {
+      reader->done = 1;
+      status = EIO;
+      StatementReaderSetError(reader);
+      break;
+    }
+
+    for (int col = 0; col < reader->schema.n_children; col++) {
+      status = StatementReaderGetOneValue(reader, col, out->children[col]);
+      if (status != 0) break;
+    }
+
+    if (status != 0) break;
+    batch_size++;
+  }
+  if (status == 0) {
+    out->length = batch_size;
+    for (int i = 0; i < reader->schema.n_children; i++) {
+      status = ArrowArrayFinishBuilding(out->children[i], &reader->error);
+      if (status != 0) break;
+    }
+
+    // If we didn't read any rows, the reader is exhausted - don't generate a spurious
+    // batch
+    if (batch_size == 0) out->release(out);
+  }
+
+  sqlite3_mutex_leave(sqlite3_db_mutex(reader->db));
+  return status;
+}
+
+int StatementReaderGetSchema(struct ArrowArrayStream* self, struct ArrowSchema* out) {
+  if (!self->release || !self->private_data) {
+    return EINVAL;
+  }
+
+  struct StatementReader* reader = (struct StatementReader*)self->private_data;
+  return ArrowSchemaDeepCopy(&reader->schema, out);
+}
+
+void StatementReaderRelease(struct ArrowArrayStream* self) {
+  if (self->private_data) {
+    struct StatementReader* reader = (struct StatementReader*)self->private_data;
+    if (reader->schema.release) {
+      reader->schema.release(&reader->schema);
+    }
+    if (reader->initial_batch.release) {
+      reader->initial_batch.release(&reader->initial_batch);
+    }
+    if (reader->types) {
+      free(reader->types);
+    }
+    if (reader->binder) {
+      AdbcSqliteBinderRelease(reader->binder);
+    }
+
+    free(self->private_data);
+  }
+  self->private_data = NULL;
+  self->release = NULL;
+  self->get_last_error = NULL;
+  self->get_next = NULL;
+  self->get_schema = NULL;
+}
+
+// -- Type inferring reader ------------------------------------------
+//
+// Every column starts as INT64. A certain number of rows is read, and
+// the column is upcast if a different type is read.  Afterwards,
+// 'compatible' values are upcast (int64 <: double <: string) and
+// 'incompatible' values will raise an error.
+//
+// Notes:
+// - The upcasting may be different when done during the inference
+//   stage vs the reading stage, because in the former, we have to do
+//   the upcasting, and in the latter, we let SQLite handle it.
+// - We don't use unions since those are a pain to work with.
+//
+// Improvements/to-dos:
+// - SQLITE_BLOB type values are not handled
+// - Columns where no non-NULL value is seen should be typed as STRING
+//   for maximum flexibility for later values
+// - Make this more flexible (e.g. choose whether to attempt to cast
+//   incompatible values, or insert a null, instead of erroring)
+// - Add the option to try to use SQLite's designated type for a
+//   column, instead of inferring the type (where possible).
+
+/// Initialize buffers for the first (type-inferred) batch of data.
+/// Use raw buffers since the types may change.
+AdbcStatusCode StatementReaderInitializeInfer(int num_columns, size_t infer_rows,
+                                              struct ArrowBitmap* validity,
+                                              struct ArrowBuffer* data,
+                                              struct ArrowBuffer* binary,
+                                              enum ArrowType* current_type,
+                                              struct AdbcError* error) {
+  for (int i = 0; i < num_columns; i++) {
+    ArrowBitmapInit(&validity[i]);
+    CHECK_NA(INTERNAL, ArrowBitmapReserve(&validity[i], infer_rows), error);
+    ArrowBufferInit(&data[i]);
+    CHECK_NA(INTERNAL, ArrowBufferReserve(&data[i], infer_rows * sizeof(int64_t)), error);
+    memset(&binary[i], 0, sizeof(struct ArrowBuffer));
+    current_type[i] = NANOARROW_TYPE_INT64;
+  }
+  return ADBC_STATUS_OK;
+}  // NOLINT(whitespace/indent)
+
+/// Finalize the first (type-inferred) batch of data.
+AdbcStatusCode StatementReaderInferFinalize(
+    sqlite3_stmt* stmt, int num_columns, int64_t num_rows, struct StatementReader* reader,
+    struct ArrowBitmap* validity, struct ArrowBuffer* data, struct ArrowBuffer* binary,
+    enum ArrowType* current_type, struct AdbcError* error) {
+  CHECK_NA(INTERNAL, ArrowSchemaInit(&reader->schema, NANOARROW_TYPE_STRUCT), error);
+  CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(&reader->schema, num_columns), error);
+  for (int col = 0; col < num_columns; col++) {
+    struct ArrowSchema* field = reader->schema.children[col];
+    const char* name = sqlite3_column_name(stmt, col);
+    CHECK_NA(INTERNAL, ArrowSchemaInit(field, current_type[col]), error);
+    CHECK_NA(INTERNAL, ArrowSchemaSetName(field, name), error);
+  }
+  CHECK_NA(INTERNAL,
+           ArrowArrayInitFromSchema(&reader->initial_batch, &reader->schema, NULL),
+           error);
+
+  // Do validation up front, so that we either move 0 buffers or all buffers
+  for (int col = 0; col < num_columns; col++) {
+    if (current_type[col] == NANOARROW_TYPE_STRING ||
+        current_type[col] == NANOARROW_TYPE_BINARY) {
+      if (binary[col].data == NULL) {
+        SetError(error, "INTERNAL: column has binary-like type but no backing buffer");
+        return ADBC_STATUS_INTERNAL;
+      }
+    }
+    reader->initial_batch.children[col]->length = num_rows;
+  }
+
+  reader->initial_batch.length = num_rows;
+
+  for (int col = 0; col < num_columns; col++) {
+    struct ArrowArray* arr = reader->initial_batch.children[col];
+    ArrowArraySetValidityBitmap(arr, &validity[col]);
+    // XXX: ignore return code since we've hardcoded the buffer index
+    (void)ArrowArraySetBuffer(arr, 1, &data[col]);
+    if (current_type[col] == NANOARROW_TYPE_STRING ||
+        current_type[col] == NANOARROW_TYPE_BINARY) {
+      (void)ArrowArraySetBuffer(arr, 2, &binary[col]);
+    }
+    arr->length = num_rows;
+  }
+  return ADBC_STATUS_OK;
+}
+
+// Convert an int64 typed column to double.
+AdbcStatusCode StatementReaderUpcastInt64ToDouble(struct ArrowBuffer* data,
+                                                  struct AdbcError* error) {
+  struct ArrowBuffer doubles;
+  ArrowBufferInit(&doubles);
+  CHECK_NA(INTERNAL, ArrowBufferReserve(&doubles, data->capacity_bytes), error);
+
+  size_t num_elements = data->size_bytes / sizeof(int64_t);
+  const int64_t* elements = (const int64_t*)data->data;
+  for (size_t i = 0; i < num_elements; i++) {
+    double value = elements[i];
+    ArrowBufferAppendUnsafe(&doubles, &value, sizeof(double));
+  }
+  ArrowBufferReset(data);
+  ArrowBufferMove(&doubles, data);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderAppendInt64ToBinary(struct ArrowBuffer* offsets,
+                                                  struct ArrowBuffer* binary,
+                                                  int64_t value, int32_t* offset,
+                                                  struct AdbcError* error) {
+  // Make sure we have at least 21 bytes available (19 digits + sign + null)
+  // Presumably this is enough, but manpage for snprintf makes no guarantees
+  // about whether locale may affect this, so check for truncation regardless
+  static const size_t kReserve = 21;
+  size_t buffer_size = kReserve;
+  CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+  char* output = (char*)(binary->data + binary->size_bytes);
+  int written = 0;
+  while (1) {
+    written = snprintf(output, buffer_size, "%" PRId64, value);
+    if (written >= buffer_size) {
+      // Truncated, resize and try again
+      // Check for overflow - presumably this can never happen...?
+      if (UINT_MAX - buffer_size < buffer_size) {
+        SetError(error, "Overflow when upcasting double to string");
+        return ADBC_STATUS_INTERNAL;
+      }
+      CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+      buffer_size += buffer_size;
+      continue;
+    }
+    break;
+  }
+  *offset += written;
+  binary->size_bytes += written;
+  ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderAppendDoubleToBinary(struct ArrowBuffer* offsets,
+                                                   struct ArrowBuffer* binary,
+                                                   double value, int32_t* offset,
+                                                   struct AdbcError* error) {
+  static const size_t kReserve = 64;
+  size_t buffer_size = kReserve;
+  CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+  char* output = (char*)(binary->data + binary->size_bytes);
+  int written = 0;
+  while (1) {
+    written = snprintf(output, buffer_size, "%e", value);
+    if (written >= buffer_size) {
+      // Truncated, resize and try again
+      // Check for overflow - presumably this can never happen...?
+      if (UINT_MAX - buffer_size < buffer_size) {
+        SetError(error, "Overflow when upcasting double to string");
+        return ADBC_STATUS_INTERNAL;
+      }
+      CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+      buffer_size += buffer_size;
+      continue;
+    }
+    break;
+  }
+  *offset += written;
+  binary->size_bytes += written;
+  ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderUpcastInt64ToBinary(struct ArrowBuffer* data,
+                                                  struct ArrowBuffer* binary,
+                                                  struct AdbcError* error) {
+  struct ArrowBuffer offsets;
+  ArrowBufferInit(&offsets);
+  ArrowBufferInit(binary);
+  CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
+  CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
+
+  size_t num_elements = data->size_bytes / sizeof(int64_t);
+  const int64_t* elements = (const int64_t*)data->data;
+
+  int32_t offset = 0;
+  ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
+  for (size_t i = 0; i < num_elements; i++) {
+    AdbcStatusCode status =
+        StatementReaderAppendInt64ToBinary(&offsets, binary, elements[i], &offset, error);
+    if (status != ADBC_STATUS_OK) return status;
+  }
+  ArrowBufferReset(data);
+  ArrowBufferMove(&offsets, data);
+  return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderUpcastDoubleToBinary(struct ArrowBuffer* data,
+                                                   struct ArrowBuffer* binary,
+                                                   struct AdbcError* error) {
+  struct ArrowBuffer offsets;
+  ArrowBufferInit(&offsets);
+  ArrowBufferInit(binary);
+  CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
+  CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
+
+  size_t num_elements = data->size_bytes / sizeof(double);
+  const double* elements = (const double*)data->data;
+
+  int32_t offset = 0;
+  ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
+  for (size_t i = 0; i < num_elements; i++) {
+    AdbcStatusCode status = StatementReaderAppendDoubleToBinary(
+        &offsets, binary, elements[i], &offset, error);
+    if (status != ADBC_STATUS_OK) return status;
+  }
+  ArrowBufferReset(data);
+  ArrowBufferMove(&offsets, data);
+  return ADBC_STATUS_OK;
+}
+
+/// Append a single value to a single column.
+AdbcStatusCode StatementReaderInferOneValue(
+    sqlite3_stmt* stmt, int col, struct ArrowBitmap* validity, struct ArrowBuffer* data,
+    struct ArrowBuffer* binary, enum ArrowType* current_type, struct AdbcError* error) {
+  // TODO: static_assert sizeof(int64) == sizeof(double)
+
+  int sqlite_type = sqlite3_column_type(stmt, col);
+  switch (sqlite_type) {
+    case SQLITE_NULL: {
+      ArrowBitmapAppendUnsafe(validity, /*set=*/0, /*length=*/1);
+      switch (*current_type) {
+        case NANOARROW_TYPE_INT64: {
+          int64_t value = 0;
+          ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+          break;
+        }
+        case NANOARROW_TYPE_DOUBLE: {
+          double value = 0.0;
+          ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+          break;
+        }
+        case NANOARROW_TYPE_STRING:
+        case NANOARROW_TYPE_BINARY: {
+          const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+          CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
+          return ADBC_STATUS_OK;
+        }
+        default:
+          return ADBC_STATUS_INTERNAL;
+      }
+      break;
+    }
+    case SQLITE_INTEGER: {
+      ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+      switch (*current_type) {
+        case NANOARROW_TYPE_INT64: {
+          int64_t value = sqlite3_column_int64(stmt, col);
+          ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+          break;
+        }
+        case NANOARROW_TYPE_DOUBLE: {
+          double value = sqlite3_column_double(stmt, col);
+          ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+          break;
+        }
+        case NANOARROW_TYPE_STRING:
+        case NANOARROW_TYPE_BINARY: {
+          int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+          return StatementReaderAppendInt64ToBinary(
+              data, binary, sqlite3_column_int64(stmt, col), &offset, error);
+        }
+        default:
+          return ADBC_STATUS_INTERNAL;
+      }
+      break;
+    }
+    case SQLITE_FLOAT: {
+      ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+      switch (*current_type) {
+        case NANOARROW_TYPE_INT64: {
+          AdbcStatusCode status = StatementReaderUpcastInt64ToDouble(data, error);
+          if (status != ADBC_STATUS_OK) return status;
+          *current_type = NANOARROW_TYPE_DOUBLE;
+          double value = sqlite3_column_double(stmt, col);
+          ArrowBufferAppendUnsafe(data, &value, sizeof(double));
+          break;
+        }
+        case NANOARROW_TYPE_DOUBLE: {
+          double value = sqlite3_column_double(stmt, col);
+          ArrowBufferAppendUnsafe(data, &value, sizeof(double));
+          break;
+        }
+        case NANOARROW_TYPE_STRING:
+        case NANOARROW_TYPE_BINARY: {
+          int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+          return StatementReaderAppendDoubleToBinary(
+              data, binary, sqlite3_column_double(stmt, col), &offset, error);
+        }
+        default:
+          return ADBC_STATUS_INTERNAL;
+      }
+      break;
+    }
+    case SQLITE_TEXT: {
+      ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+      switch (*current_type) {
+        case NANOARROW_TYPE_INT64: {
+          AdbcStatusCode status = StatementReaderUpcastInt64ToBinary(data, binary, error);
+          if (status != ADBC_STATUS_OK) return status;
+          *current_type = NANOARROW_TYPE_STRING;
+          break;
+        }
+        case NANOARROW_TYPE_DOUBLE: {
+          AdbcStatusCode status =
+              StatementReaderUpcastDoubleToBinary(data, binary, error);
+          if (status != ADBC_STATUS_OK) return status;
+          *current_type = NANOARROW_TYPE_STRING;
+          break;
+        }
+        case NANOARROW_TYPE_STRING:
+        case NANOARROW_TYPE_BINARY:
+          break;
+        default:
+          return ADBC_STATUS_INTERNAL;
+      }
+
+      const unsigned char* value = sqlite3_column_text(stmt, col);
+      const int size = sqlite3_column_bytes(stmt, col);
+      const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1] + size;
+      CHECK_NA(INTERNAL, ArrowBufferAppend(binary, value, size), error);
+      CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
+      break;
+    }
+    case SQLITE_BLOB:
+    default: {
+      return ADBC_STATUS_IO;
+    }
+  }
+  return ADBC_STATUS_OK;
+}  // NOLINT(whitespace/indent)
+
+AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
+                                      struct AdbcSqliteBinder* binder, size_t batch_size,
+                                      struct ArrowArrayStream* stream,
+                                      struct AdbcError* error) {
+  struct StatementReader* reader = malloc(sizeof(struct StatementReader));
+  memset(reader, 0, sizeof(struct StatementReader));
+  reader->db = db;
+  reader->stmt = stmt;
+  reader->batch_size = batch_size;
+
+  stream->private_data = reader;
+  stream->release = StatementReaderRelease;
+  stream->get_last_error = StatementReaderGetLastError;
+  stream->get_next = StatementReaderGetNext;
+  stream->get_schema = StatementReaderGetSchema;
+
+  sqlite3_mutex_enter(sqlite3_db_mutex(db));
+
+  const int num_columns = sqlite3_column_count(stmt);
+  struct ArrowBitmap* validity = malloc(num_columns * sizeof(struct ArrowBitmap));
+  struct ArrowBuffer* data = malloc(num_columns * sizeof(struct ArrowBuffer));
+  struct ArrowBuffer* binary = malloc(num_columns * sizeof(struct ArrowBuffer));
+  enum ArrowType* current_type = malloc(num_columns * sizeof(enum ArrowType));
+
+  AdbcStatusCode status = StatementReaderInitializeInfer(
+      num_columns, batch_size, validity, data, binary, current_type, error);
+  if (status == ADBC_STATUS_OK) {
+    int64_t num_rows = 0;
+    while (num_rows < batch_size) {
+      if (binder) {
+        char finished = 0;
+        status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
+        if (status != ADBC_STATUS_OK) break;
+        if (finished) {
+          reader->done = 1;
+          break;
+        }
+      }
+
+      int rc = sqlite3_step(stmt);
+      if (rc == SQLITE_DONE) {
+        reader->done = 1;
+        break;
+      } else if (rc == SQLITE_ERROR) {
+        status = ADBC_STATUS_IO;
+        break;
+      }
+
+      for (int col = 0; col < num_columns; col++) {
+        status = StatementReaderInferOneValue(stmt, col, &validity[col], &data[col],
+                                              &binary[col], &current_type[col], error);
+        if (status != ADBC_STATUS_OK) break;
+      }
+      if (status != ADBC_STATUS_OK) break;
+      num_rows++;
+    }
+
+    if (status == ADBC_STATUS_OK) {
+      status = StatementReaderInferFinalize(stmt, num_columns, num_rows, reader, validity,
+                                            data, binary, current_type, error);
+    }
+  }
+
+  if (status != ADBC_STATUS_OK) {
+    // Free the individual buffers
+    // This is OK, since InferFinalize either moves all buffers or no buffers
+    for (int i = 0; i < num_columns; i++) {
+      ArrowBitmapReset(&validity[i]);
+      ArrowBufferReset(&data[i]);
+      ArrowBufferReset(&binary[i]);
+    }
+    free(current_type);
+  } else {
+    reader->types = current_type;
+    reader->binder = binder;
+  }
+
+  free(data);
+  free(validity);
+  free(binary);
+
+  sqlite3_mutex_leave(sqlite3_db_mutex(db));
+  return status;
+}  // NOLINT(whitespace/indent)
diff --git a/c/driver/sqlite/statement_reader.h b/c/driver/sqlite/statement_reader.h
new file mode 100644
index 0000000..3b1c425
--- /dev/null
+++ b/c/driver/sqlite/statement_reader.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Utilities for executing SQLite statements.
+
+#pragma once
+
+#include <adbc.h>
+#include <nanoarrow.h>
+#include <sqlite3.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/// \brief Helper to manage binding data to a SQLite statement.
+struct AdbcSqliteBinder {
+  // State
+  struct ArrowSchema schema;
+  struct ArrowArrayStream params;
+  enum ArrowType* types;
+
+  // Scratch space
+  struct ArrowArray array;
+  struct ArrowArrayView batch;
+  int64_t next_row;
+};
+
+AdbcStatusCode AdbcSqliteBinderSetArray(struct AdbcSqliteBinder* binder,
+                                        struct ArrowArray* values,
+                                        struct ArrowSchema* schema,
+                                        struct AdbcError* error);
+AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct AdbcSqliteBinder* binder,
+                                              struct ArrowArrayStream* values,
+                                              struct AdbcError* error);
+AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder, sqlite3* conn,
+                                        sqlite3_stmt* stmt, char* finished,
+                                        struct AdbcError* error);
+void AdbcSqliteBinderRelease(struct AdbcSqliteBinder* binder);
+
+/// \brief Initialize an ArrowArrayStream from a sqlite3_stmt.
+/// \param[in] db The SQLite connection.
+/// \param[in] stmt The SQLite statement.
+/// \param[in] binder Query parameters to bind, if provided.
+/// \param[in] infer_rows How many rows to read to infer the Arrow schema.
+/// \param[out] stream The stream to export to.
+/// \param[out] error Error details, if needed.
+AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
+                                      struct AdbcSqliteBinder* binder, size_t batch_size,
+                                      struct ArrowArrayStream* stream,
+                                      struct AdbcError* error);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/c/driver/sqlite/types.h b/c/driver/sqlite/types.h
new file mode 100644
index 0000000..cd46f4f
--- /dev/null
+++ b/c/driver/sqlite/types.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <adbc.h>
+#include <sqlite3.h>
+
+#include "statement_reader.h"
+
+struct SqliteDatabase {
+  sqlite3* db;
+  char* uri;
+  size_t connection_count;
+};
+
+struct SqliteConnection {
+  sqlite3* conn;
+  char active_transaction;
+};
+
+struct SqliteStatement {
+  sqlite3* conn;
+
+  // -- Query state -----------------------------------------
+
+  sqlite3_stmt* stmt;
+  char prepared;
+  char* query;
+  size_t query_len;
+
+  // -- Bind state ------------------------------------------
+  struct AdbcSqliteBinder binder;
+
+  // -- Ingest state ----------------------------------------
+  char* target_table;
+  char append;
+
+  // -- Query options ---------------------------------------
+  int batch_size;
+};
diff --git a/c/driver/sqlite/utils.c b/c/driver/sqlite/utils.c
new file mode 100644
index 0000000..f85ec0d
--- /dev/null
+++ b/c/driver/sqlite/utils.c
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils.h"
+
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nanoarrow.h>
+
+static size_t kErrorBufferSize = 256;
+static char kErrorPrefix[] = "[SQLite] ";
+
+void ReleaseError(struct AdbcError* error) {
+  free(error->message);
+  error->message = NULL;
+  error->release = NULL;
+}
+
+void SetError(struct AdbcError* error, const char* format, ...) {
+  if (!error) return;
+  if (error->release) {
+    // TODO: combine the errors if possible
+    error->release(error);
+  }
+  error->message = malloc(kErrorBufferSize);
+  if (!error->message) return;
+
+  error->release = &ReleaseError;
+
+  memcpy(error->message, kErrorPrefix, sizeof(kErrorPrefix));
+
+  va_list args;
+  va_start(args, format);
+  vsnprintf(error->message + sizeof(kErrorPrefix) - 1,
+            kErrorBufferSize - sizeof(kErrorPrefix) + 1, format, args);
+  va_end(args);
+}
+
+struct SingleBatchArrayStream {
+  struct ArrowSchema schema;
+  struct ArrowArray batch;
+};
+const char* SingleBatchArrayStreamGetLastError(struct ArrowArrayStream* stream) {
+  return NULL;
+}
+int SingleBatchArrayStreamGetNext(struct ArrowArrayStream* stream,
+                                  struct ArrowArray* batch) {
+  if (!stream || !stream->private_data) return EINVAL;
+  struct SingleBatchArrayStream* impl =
+      (struct SingleBatchArrayStream*)stream->private_data;
+
+  memcpy(batch, &impl->batch, sizeof(*batch));
+  memset(&impl->batch, 0, sizeof(*batch));
+  return 0;
+}
+int SingleBatchArrayStreamGetSchema(struct ArrowArrayStream* stream,
+                                    struct ArrowSchema* schema) {
+  if (!stream || !stream->private_data) return EINVAL;
+  struct SingleBatchArrayStream* impl =
+      (struct SingleBatchArrayStream*)stream->private_data;
+
+  return ArrowSchemaDeepCopy(&impl->schema, schema);
+}
+void SingleBatchArrayStreamRelease(struct ArrowArrayStream* stream) {
+  if (!stream || !stream->private_data) return;
+  struct SingleBatchArrayStream* impl =
+      (struct SingleBatchArrayStream*)stream->private_data;
+  impl->schema.release(&impl->schema);
+  if (impl->batch.release) impl->batch.release(&impl->batch);
+  free(impl);
+
+  memset(stream, 0, sizeof(*stream));
+}
+
+AdbcStatusCode BatchToArrayStream(struct ArrowArray* values, struct ArrowSchema* schema,
+                                  struct ArrowArrayStream* stream,
+                                  struct AdbcError* error) {
+  if (!values->release) {
+    SetError(error, "ArrowArray is not initialized");
+    return ADBC_STATUS_INTERNAL;
+  } else if (!schema->release) {
+    SetError(error, "ArrowSchema is not initialized");
+    return ADBC_STATUS_INTERNAL;
+  } else if (stream->release) {
+    SetError(error, "ArrowArrayStream is already initialized");
+    return ADBC_STATUS_INTERNAL;
+  }
+
+  struct SingleBatchArrayStream* impl =
+      (struct SingleBatchArrayStream*)malloc(sizeof(*impl));
+  memcpy(&impl->schema, schema, sizeof(*schema));
+  memcpy(&impl->batch, values, sizeof(*values));
+  memset(schema, 0, sizeof(*schema));
+  memset(values, 0, sizeof(*values));
+  stream->private_data = impl;
+  stream->get_last_error = SingleBatchArrayStreamGetLastError;
+  stream->get_next = SingleBatchArrayStreamGetNext;
+  stream->get_schema = SingleBatchArrayStreamGetSchema;
+  stream->release = SingleBatchArrayStreamRelease;
+
+  return ADBC_STATUS_OK;
+}
+
+void StringBuilderInit(struct StringBuilder* builder, size_t initial_size) {
+  builder->buffer = (char*)malloc(initial_size);
+  builder->size = 0;
+  builder->capacity = initial_size;
+}
+void StringBuilderAppend(struct StringBuilder* builder, const char* value) {
+  size_t length = strlen(value);
+  size_t new_size = builder->size + length;
+  if (new_size > builder->capacity) {
+    size_t new_capacity = builder->size + length - builder->capacity;
+    if (builder->size == 0) new_capacity++;
+
+    builder->buffer = realloc(builder->buffer, new_capacity);
+    builder->capacity = new_capacity;
+  }
+  strncpy(builder->buffer + builder->size, value, length);
+  builder->buffer[new_size] = '\0';
+  builder->size = new_size;
+}
+void StringBuilderReset(struct StringBuilder* builder) {
+  if (builder->buffer) {
+    free(builder->buffer);
+  }
+  memset(builder, 0, sizeof(*builder));
+}
diff --git a/c/driver/sqlite/utils.h b/c/driver/sqlite/utils.h
new file mode 100644
index 0000000..5d65d88
--- /dev/null
+++ b/c/driver/sqlite/utils.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <adbc.h>
+
+/// Set error details using a format string.
+void SetError(struct AdbcError* error, const char* format, ...)
+    __attribute__((format(printf, 2, 3)));
+
+/// Wrap a single batch as a stream.
+AdbcStatusCode BatchToArrayStream(struct ArrowArray* values, struct ArrowSchema* schema,
+                                  struct ArrowArrayStream* stream,
+                                  struct AdbcError* error);
+
+struct StringBuilder {
+  char* buffer;
+  // Not including null terminator
+  size_t size;
+  size_t capacity;
+};
+void StringBuilderInit(struct StringBuilder* builder, size_t initial_size);
+void StringBuilderAppend(struct StringBuilder* builder, const char* value);
+void StringBuilderReset(struct StringBuilder* builder);
+
+/// Check an NanoArrow status code.
+#define CHECK_NA(CODE, EXPR, ERROR)                                                    \
+  do {                                                                                 \
+    ArrowErrorCode arrow_error_code = (EXPR);                                          \
+    if (arrow_error_code != 0) {                                                       \
+      SetError(ERROR, "%s failed: (%d) %s\nDetail: %s:%d %s", #EXPR, arrow_error_code, \
+               strerror(arrow_error_code), __FILE__, __LINE__, __FUNCTION__);          \
+      return ADBC_STATUS_##CODE;                                                       \
+    }                                                                                  \
+  } while (0)
+
+/// Check an NanoArrow status code.
+#define CHECK_NA_DETAIL(CODE, EXPR, NA_ERROR, ERROR)                              \
+  do {                                                                            \
+    ArrowErrorCode arrow_error_code = (EXPR);                                     \
+    if (arrow_error_code != 0) {                                                  \
+      SetError(ERROR, "%s failed: (%d) %s: %s\nDetail: %s:%d %s", #EXPR,          \
+               arrow_error_code, strerror(arrow_error_code), (NA_ERROR)->message, \
+               __FILE__, __LINE__, __FUNCTION__);                                 \
+      return ADBC_STATUS_##CODE;                                                  \
+    }                                                                             \
+  } while (0)
+
+/// Check a generic status.
+#define RAISE(CODE, EXPR, ERRMSG, ERROR)                                          \
+  do {                                                                            \
+    if (!(EXPR)) {                                                                \
+      SetError(ERROR, "%s failed: %s\nDetail: %s:%d %s", #EXPR, ERRMSG, __FILE__, \
+               __LINE__, __FUNCTION__);                                           \
+      return ADBC_STATUS_##CODE;                                                  \
+    }                                                                             \
+  } while (0)
+
+/// Check an NanoArrow status code.
+#define RAISE_NA(EXPR)                                  \
+  do {                                                  \
+    ArrowErrorCode arrow_error_code = (EXPR);           \
+    if (arrow_error_code != 0) return arrow_error_code; \
+  } while (0)
+
+/// Check an ADBC status code.
+#define RAISE_ADBC(EXPR)                                             \
+  do {                                                               \
+    AdbcStatusCode adbc_status_code = (EXPR);                        \
+    if (adbc_status_code != ADBC_STATUS_OK) return adbc_status_code; \
+  } while (0)
diff --git a/c/driver_manager/adbc_driver_manager.cc b/c/driver_manager/adbc_driver_manager.cc
index b73d935..80b0358 100644
--- a/c/driver_manager/adbc_driver_manager.cc
+++ b/c/driver_manager/adbc_driver_manager.cc
@@ -316,11 +316,17 @@ AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError*
     status = database->private_driver->DatabaseSetOption(database, option.first.c_str(),
                                                          option.second.c_str(), error);
     if (status != ADBC_STATUS_OK) {
+      delete args;
+      // Release the database
+      std::ignore = database->private_driver->DatabaseRelease(database, error);
       if (database->private_driver->release) {
         database->private_driver->release(database->private_driver, error);
       }
       delete database->private_driver;
       database->private_driver = nullptr;
+      // Should be redundant, but ensure that AdbcDatabaseRelease
+      // below doesn't think that it contains a TempDatabase
+      database->private_data = nullptr;
       return status;
     }
   }
@@ -411,6 +417,7 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
     return ADBC_STATUS_INVALID_STATE;
   }
   TempConnection* args = reinterpret_cast<TempConnection*>(connection->private_data);
+  connection->private_data = nullptr;
   std::unordered_map<std::string, std::string> options = std::move(args->options);
   delete args;
 
@@ -714,13 +721,13 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
   }
 
   void* load_handle = dlsym(handle, entrypoint);
-  init_func = reinterpret_cast<AdbcDriverInitFunc>(load_handle);
-  if (!init_func) {
+  if (!load_handle) {
     std::string message = "dlsym() failed: ";
     message += dlerror();
     SetError(error, message);
     return ADBC_STATUS_INTERNAL;
   }
+  init_func = reinterpret_cast<AdbcDriverInitFunc>(load_handle);
 
 #endif  // defined(_WIN32)
 
diff --git a/c/driver_manager/adbc_driver_manager_test.cc b/c/driver_manager/adbc_driver_manager_test.cc
index 67762fa..23975e3 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -26,19 +26,23 @@
 #include "adbc_driver_manager.h"
 #include "driver/test_util.h"
 #include "validation/adbc_validation.h"
+#include "validation/adbc_validation_util.h"
 
 // Tests of the SQLite example driver, except using the driver manager
 
 namespace adbc {
 
+using adbc_validation::IsOkStatus;
+
 class DriverManager : public ::testing::Test {
  public:
   void SetUp() override {
     std::memset(&driver, 0, sizeof(driver));
     std::memset(&error, 0, sizeof(error));
 
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcLoadDriver("adbc_driver_sqlite", NULL,
-                                                    ADBC_VERSION_1_0_0, &driver, &error));
+    ASSERT_THAT(AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_0_0, &driver,
+                               &error),
+                IsOkStatus(&error));
   }
 
   void TearDown() override {
@@ -92,7 +96,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
       return res;
     }
     return AdbcDatabaseSetOption(
-        database, "filename", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
+        database, "uri", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
   }
 
   std::string BindParameter(int index) const override { return "?"; }
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 0378966..ab4861c 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -45,160 +45,12 @@ namespace {
 
 /// Assertion helpers
 
-#define ADBCV_CONCAT(a, b) a##b
-#define ADBCV_NAME(a, b) ADBCV_CONCAT(a, b)
-
 #define CHECK_OK(EXPR)                                              \
   do {                                                              \
     if (auto adbc_status = (EXPR); adbc_status != ADBC_STATUS_OK) { \
       return adbc_status;                                           \
     }                                                               \
   } while (false)
-
-#define CHECK_ERRNO_IMPL(ERRNO, EXPR)   \
-  if (int ERRNO = (EXPR); ERRNO != 0) { \
-    return ERRNO;                       \
-  }
-#define CHECK_ERRNO(EXPR) CHECK_ERRNO_IMPL(ADBCV_NAME(errno_, __COUNTER__), EXPR)
-
-int MakeSchema(struct ArrowSchema* schema,
-               const std::vector<std::pair<std::string, ArrowType>>& fields) {
-  CHECK_ERRNO(ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT));
-  CHECK_ERRNO(ArrowSchemaAllocateChildren(schema, fields.size()));
-  size_t i = 0;
-  for (const std::pair<std::string, ArrowType>& field : fields) {
-    CHECK_ERRNO(ArrowSchemaInit(schema->children[i], field.second));
-    CHECK_ERRNO(ArrowSchemaSetName(schema->children[i], field.first.c_str()));
-    i++;
-  }
-  return 0;
-}
-
-template <typename T>
-int MakeArray(struct ArrowArray* parent, struct ArrowArray* array,
-              const std::vector<std::optional<T>>& values) {
-  for (const auto& v : values) {
-    if (v.has_value()) {
-      if constexpr (std::is_same<T, int64_t>::value) {
-        CHECK_ERRNO(ArrowArrayAppendInt(array, *v));
-      } else if constexpr (std::is_same<T, std::string>::value) {
-        struct ArrowStringView view;
-        view.data = v->c_str();
-        view.n_bytes = v->size();
-        CHECK_ERRNO(ArrowArrayAppendString(array, view));
-      } else {
-        static_assert(!sizeof(T), "Not yet implemented");
-        return ENOTSUP;
-      }
-    } else {
-      CHECK_ERRNO(ArrowArrayAppendNull(array, 1));
-    }
-  }
-  return 0;
-}
-
-template <typename First>
-int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
-                  const std::vector<std::optional<First>>& first) {
-  return MakeArray<First>(batch, batch->children[i], first);
-}
-
-template <typename First, typename... Rest>
-int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
-                  const std::vector<std::optional<First>>& first,
-                  const std::vector<std::optional<Rest>>&... rest) {
-  CHECK_ERRNO(MakeArray<First>(batch, batch->children[i], first));
-  return MakeBatchImpl(batch, i + 1, error, rest...);
-}
-
-template <typename... T>
-int MakeBatch(struct ArrowArray* batch, struct ArrowError* error,
-              const std::vector<std::optional<T>>&... columns) {
-  CHECK_ERRNO(ArrowArrayStartAppending(batch));
-  CHECK_ERRNO(MakeBatchImpl(batch, 0, error, columns...));
-  for (size_t i = 0; i < static_cast<size_t>(batch->n_children); i++) {
-    if (batch->length > 0 && batch->children[i]->length != batch->length) {
-      ADD_FAILURE() << "Column lengths are inconsistent: column " << i << " has length "
-                    << batch->children[i]->length;
-      return EINVAL;
-    }
-    batch->length = batch->children[i]->length;
-  }
-  return ArrowArrayFinishBuilding(batch, error);
-}
-
-template <typename... T>
-int MakeBatch(struct ArrowSchema* schema, struct ArrowArray* batch,
-              struct ArrowError* error,
-              const std::vector<std::pair<std::string, ArrowType>>& fields,
-              const std::vector<std::optional<T>>&... columns) {
-  CHECK_ERRNO(MakeSchema(schema, fields));
-  CHECK_ERRNO(ArrowArrayInitFromSchema(batch, schema, error));
-  return MakeBatch(batch, error, columns...);
-}
-
-class ConstantArrayStream {
- public:
-  explicit ConstantArrayStream(struct ArrowSchema* schema,
-                               std::vector<struct ArrowArray> batches)
-      : batches_(std::move(batches)), next_index_(0) {
-    schema_ = *schema;
-    std::memset(schema, 0, sizeof(*schema));
-  }
-
-  static const char* GetLastError(struct ArrowArrayStream* stream) { return nullptr; }
-
-  static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
-    if (!stream || !stream->private_data || !out) return EINVAL;
-    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
-    if (self->next_index_ >= self->batches_.size()) {
-      out->release = nullptr;
-      return 0;
-    }
-
-    *out = self->batches_[self->next_index_];
-    std::memset(&self->batches_[self->next_index_], 0, sizeof(struct ArrowArray));
-
-    self->next_index_++;
-    return 0;
-  }
-
-  static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
-    if (!stream || !stream->private_data || !out) return EINVAL;
-    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
-    return ArrowSchemaDeepCopy(&self->schema_, out);
-  }
-
-  static void Release(struct ArrowArrayStream* stream) {
-    if (!stream->private_data) return;
-    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
-
-    self->schema_.release(&self->schema_);
-    for (size_t i = 0; i < self->batches_.size(); i++) {
-      if (self->batches_[i].release) {
-        self->batches_[i].release(&self->batches_[i]);
-      }
-    }
-
-    delete self;
-    std::memset(stream, 0, sizeof(*stream));
-  }
-
- private:
-  struct ArrowSchema schema_;
-  std::vector<struct ArrowArray> batches_;
-  size_t next_index_;
-};
-
-void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
-                std::vector<struct ArrowArray> batches) {
-  stream->get_last_error = &ConstantArrayStream::GetLastError;
-  stream->get_next = &ConstantArrayStream::GetNext;
-  stream->get_schema = &ConstantArrayStream::GetSchema;
-  stream->release = &ConstantArrayStream::Release;
-  stream->private_data = new ConstantArrayStream(schema, std::move(batches));
-}
-
 }  // namespace
 
 //------------------------------------------------------------
@@ -342,10 +194,12 @@ void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* erro
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
-  ASSERT_THAT((MakeBatch<int64_t, std::string>(
-                  &schema.value, &array.value, &na_error,
-                  {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}},
-                  {42, -42, std::nullopt}, {"foo", std::nullopt, ""})),
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+                                         {"strings", NANOARROW_TYPE_STRING}}),
+              IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+                                               {42, -42, std::nullopt},
+                                               {"foo", std::nullopt, ""})),
               IsOkErrno());
 
   Handle<struct AdbcStatement> statement;
@@ -513,8 +367,8 @@ void CheckGetObjectsSchema(struct ArrowSchema* schema) {
       CompareSchema(usage_schema, {
                                       {"fk_catalog", NANOARROW_TYPE_STRING, NULLABLE},
                                       {"fk_db_schema", NANOARROW_TYPE_STRING, NULLABLE},
-                                      {"fk_table", NANOARROW_TYPE_STRING, NULLABLE},
-                                      {"fk_column_name", NANOARROW_TYPE_STRING, NULLABLE},
+                                      {"fk_table", NANOARROW_TYPE_STRING, NOT_NULL},
+                                      {"fk_column_name", NANOARROW_TYPE_STRING, NOT_NULL},
                                   }));
 }
 
@@ -912,14 +766,14 @@ void StatementTest::SetUpTest() {
 }
 
 void StatementTest::TearDownTest() {
+  if (statement.private_data) {
+    EXPECT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+  }
+  EXPECT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
+  EXPECT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
   if (error.release) {
     error.release(&error);
   }
-  if (statement.private_data) {
-    ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
-  }
-  ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
-  ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
 }
 
 void StatementTest::TestNewInit() {
@@ -952,9 +806,9 @@ void StatementTest::TestSqlIngestInts() {
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
   ASSERT_THAT(
-      MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                         {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt}),
+      MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt}),
       IsOkErrno());
 
   ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
@@ -1004,8 +858,8 @@ void StatementTest::TestSqlIngestAppend() {
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
-  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                                 {{"int64s", NANOARROW_TYPE_INT64}}, {42}),
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}),
               IsOkErrno());
 
   ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
@@ -1023,9 +877,10 @@ void StatementTest::TestSqlIngestAppend() {
   // Now append
 
   // Re-initialize since Bind() should take ownership of data
-  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                                 {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
-              IsOkErrno());
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT(
+      MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+      IsOkErrno());
 
   ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
                                      "bulk_ingest", &error),
@@ -1094,9 +949,10 @@ void StatementTest::TestSqlIngestErrors() {
   ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
                                      ADBC_INGEST_OPTION_MODE_APPEND, &error),
               IsOkStatus(&error));
-  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                                 {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
-              IsOkErrno(&na_error));
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT(
+      MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+      IsOkErrno(&na_error));
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
 
@@ -1108,18 +964,20 @@ void StatementTest::TestSqlIngestErrors() {
   ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
                                      ADBC_INGEST_OPTION_MODE_CREATE, &error),
               IsOkStatus(&error));
-  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                                 {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
-              IsOkErrno(&na_error));
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT(
+      MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+      IsOkErrno(&na_error));
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
               IsOkStatus(&error));
 
   // ...then try to overwrite it
-  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                                 {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
-              IsOkErrno(&na_error));
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT(
+      MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+      IsOkErrno(&na_error));
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1127,10 +985,11 @@ void StatementTest::TestSqlIngestErrors() {
   if (error.release) error.release(&error);
 
   // ...then try to append an incompatible schema
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+                                         {"coltwo", NANOARROW_TYPE_INT64}}),
+              IsOkErrno());
   ASSERT_THAT(
-      (MakeBatch<int64_t, int64_t>(
-          &schema.value, &array.value, &na_error,
-          {{"int64s", NANOARROW_TYPE_INT64}, {"coltwo", NANOARROW_TYPE_INT64}}, {}, {})),
+      (MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {}, {})),
       IsOkErrno(&na_error));
 
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
@@ -1149,10 +1008,10 @@ void StatementTest::TestSqlIngestMultipleConnections() {
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
-  ASSERT_THAT(
-      (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                          {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
-      IsOkErrno());
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
 
   ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
@@ -1168,7 +1027,7 @@ void StatementTest::TestSqlIngestMultipleConnections() {
   ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
 
   {
-    struct AdbcConnection connection2;
+    struct AdbcConnection connection2 = {};
     ASSERT_THAT(AdbcConnectionNew(&connection2, &error), IsOkStatus(&error));
     ASSERT_THAT(AdbcConnectionInit(&connection2, &database, &error), IsOkStatus(&error));
     ASSERT_THAT(AdbcStatementNew(&connection2, &statement, &error), IsOkStatus(&error));
@@ -1331,10 +1190,12 @@ void StatementTest::TestSqlPrepareSelectParams() {
   Handle<struct ArrowSchema> schema;
   Handle<struct ArrowArray> array;
   struct ArrowError na_error;
-  ASSERT_THAT((MakeBatch<int64_t, std::string>(
-                  &schema.value, &array.value, &na_error,
-                  {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}},
-                  {42, -42, std::nullopt}, {"", std::nullopt, "bar"})),
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+                                         {"strings", NANOARROW_TYPE_STRING}}),
+              IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+                                               {42, -42, std::nullopt},
+                                               {"", std::nullopt, "bar"})),
               IsOkErrno());
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
@@ -1386,10 +1247,10 @@ void StatementTest::TestSqlPrepareUpdate() {
   ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
                                      "bulk_ingest", &error),
               IsOkStatus(&error));
-  ASSERT_THAT(
-      (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                          {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
-      IsOkErrno());
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
 
@@ -1404,10 +1265,10 @@ void StatementTest::TestSqlPrepareUpdate() {
   ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
 
   // Bind and execute
-  ASSERT_THAT(
-      (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                          {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
-      IsOkErrno());
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
   ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
               IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1453,8 +1314,7 @@ void StatementTest::TestSqlPrepareUpdateStream() {
               IsOkStatus(&error));
   struct ArrowError na_error;
 
-  const std::vector<std::pair<std::string, ArrowType>> fields = {
-      {"ints", NANOARROW_TYPE_INT64}};
+  const std::vector<SchemaField> fields = {{"ints", NANOARROW_TYPE_INT64}};
 
   // Create table
   {
@@ -1464,8 +1324,9 @@ void StatementTest::TestSqlPrepareUpdateStream() {
     ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
                                        "bulk_ingest", &error),
                 IsOkStatus(&error));
-    ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, fields, {})),
-                IsOkErrno());
+    ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
+    ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {})),
+                IsOkErrno(&na_error));
     ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
                 IsOkStatus(&error));
     ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1473,21 +1334,18 @@ void StatementTest::TestSqlPrepareUpdateStream() {
   }
 
   // Generate stream
-  struct ArrowArrayStream stream;
-  struct ArrowSchema schema;
+  Handle<struct ArrowArrayStream> stream;
+  Handle<struct ArrowSchema> schema;
   std::vector<struct ArrowArray> batches(2);
 
-  ASSERT_NO_FATAL_FAILURE(MakeSchema(&schema, fields));
-  ASSERT_THAT(ArrowArrayInitFromSchema(&batches[0], &schema, &na_error),
+  ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &batches[0], &na_error,
+                                  {1, 2, std::nullopt, 3})),
               IsOkErrno(&na_error));
-  ASSERT_THAT((MakeBatch<int64_t>(&batches[0], &na_error, {1, 2, std::nullopt, 3})),
-              IsOkErrno(&na_error));
-  ASSERT_THAT(ArrowArrayInitFromSchema(&batches[1], &schema, &na_error),
-              IsOkErrno(&na_error));
-  ASSERT_THAT(MakeBatch<int64_t>(&batches[1], &na_error, {std::nullopt, 3}),
-              IsOkErrno(&na_error));
-
-  ASSERT_NO_FATAL_FAILURE(MakeStream(&stream, &schema, std::move(batches)));
+  ASSERT_THAT(
+      MakeBatch<int64_t>(&schema.value, &batches[1], &na_error, {std::nullopt, 3}),
+      IsOkErrno(&na_error));
+  MakeStream(&stream.value, &schema.value, std::move(batches));
 
   // Prepare
   std::string query =
@@ -1497,7 +1355,8 @@ void StatementTest::TestSqlPrepareUpdateStream() {
   ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
 
   // Bind and execute
-  ASSERT_THAT(AdbcStatementBindStream(&statement, &stream, &error), IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBindStream(&statement, &stream.value, &error),
+              IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
               IsOkStatus(&error));
 
@@ -1561,10 +1420,10 @@ void StatementTest::TestSqlPrepareErrorParamCountMismatch() {
   ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
               IsOkStatus(&error));
   ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
-  ASSERT_THAT(
-      (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
-                          {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
-      IsOkErrno());
+  ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+  ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+                                  {42, -42, std::nullopt})),
+              IsOkErrno());
 
   ASSERT_THAT(
       ([&]() -> AdbcStatusCode {
@@ -1859,10 +1718,6 @@ void StatementTest::TestResultInvalidation() {
   ASSERT_NO_FATAL_FAILURE(reader1.Next());
 }
 
-#undef ADBCV_CONCAT
-#undef ADBCV_NAME
-#undef CHECK_ERRNO_IMPL
-#undef CHECK_ERRNO
 #undef NOT_NULL
 #undef NULLABLE
 }  // namespace adbc_validation
diff --git a/c/validation/adbc_validation_util.cc b/c/validation/adbc_validation_util.cc
index 215b69a..4a17199 100644
--- a/c/validation/adbc_validation_util.cc
+++ b/c/validation/adbc_validation_util.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "adbc_validation_util.h"
+#include <adbc.h>
 
 #include "adbc_validation.h"
 
@@ -117,6 +118,8 @@ bool IsAdbcStatusCode::MatchAndExplain(AdbcStatusCode actual, std::ostream* os)
         if (error_->message) *os << "\nError message: " << error_->message;
         if (error_->sqlstate[0]) *os << "\nSQLSTATE: " << error_->sqlstate;
         if (error_->vendor_code) *os << "\nVendor code: " << error_->vendor_code;
+
+        if (error_->release) error_->release(error_);
       }
     }
     return false;
@@ -140,6 +143,92 @@ void IsAdbcStatusCode::DescribeNegationTo(std::ostream* os) const {
   return IsAdbcStatusCode(code, error);
 }
 
+#define CHECK_ERRNO(EXPR)                             \
+  do {                                                \
+    if (int adbcv_errno = (EXPR); adbcv_errno != 0) { \
+      return adbcv_errno;                             \
+    }                                                 \
+  } while (false);
+
+int MakeSchema(struct ArrowSchema* schema, const std::vector<SchemaField>& fields) {
+  CHECK_ERRNO(ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT));
+  CHECK_ERRNO(ArrowSchemaAllocateChildren(schema, fields.size()));
+  size_t i = 0;
+  for (const SchemaField& field : fields) {
+    CHECK_ERRNO(ArrowSchemaInit(schema->children[i], field.type));
+    CHECK_ERRNO(ArrowSchemaSetName(schema->children[i], field.name.c_str()));
+    if (!field.nullable) {
+      schema->children[i]->flags &= ~ARROW_FLAG_NULLABLE;
+    }
+    i++;
+  }
+  return 0;
+}
+
+#undef CHECK_ERRNO
+
+class ConstantArrayStream {
+ public:
+  explicit ConstantArrayStream(struct ArrowSchema* schema,
+                               std::vector<struct ArrowArray> batches)
+      : batches_(std::move(batches)), next_index_(0) {
+    schema_ = *schema;
+    std::memset(schema, 0, sizeof(*schema));
+  }
+
+  static const char* GetLastError(struct ArrowArrayStream* stream) { return nullptr; }
+
+  static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
+    if (!stream || !stream->private_data || !out) return EINVAL;
+    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+    if (self->next_index_ >= self->batches_.size()) {
+      out->release = nullptr;
+      return 0;
+    }
+
+    *out = self->batches_[self->next_index_];
+    std::memset(&self->batches_[self->next_index_], 0, sizeof(struct ArrowArray));
+
+    self->next_index_++;
+    return 0;
+  }
+
+  static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
+    if (!stream || !stream->private_data || !out) return EINVAL;
+    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+    return ArrowSchemaDeepCopy(&self->schema_, out);
+  }
+
+  static void Release(struct ArrowArrayStream* stream) {
+    if (!stream->private_data) return;
+    auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+
+    self->schema_.release(&self->schema_);
+    for (size_t i = 0; i < self->batches_.size(); i++) {
+      if (self->batches_[i].release) {
+        self->batches_[i].release(&self->batches_[i]);
+      }
+    }
+
+    delete self;
+    std::memset(stream, 0, sizeof(*stream));
+  }
+
+ private:
+  struct ArrowSchema schema_;
+  std::vector<struct ArrowArray> batches_;
+  size_t next_index_;
+};
+
+void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
+                std::vector<struct ArrowArray> batches) {
+  stream->get_last_error = &ConstantArrayStream::GetLastError;
+  stream->get_next = &ConstantArrayStream::GetNext;
+  stream->get_schema = &ConstantArrayStream::GetSchema;
+  stream->release = &ConstantArrayStream::Release;
+  stream->private_data = new ConstantArrayStream(schema, std::move(batches));
+}
+
 void CompareSchema(
     struct ArrowSchema* schema,
     const std::vector<std::tuple<std::optional<std::string>, ArrowType, bool>>& fields) {
diff --git a/c/validation/adbc_validation_util.h b/c/validation/adbc_validation_util.h
index f4ce5e8..b9a11a3 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <tuple>
 #include <type_traits>
+#include <utility>
 #include <vector>
 
 #include <adbc.h>
@@ -189,6 +190,106 @@ struct StreamReader {
   }
 };
 
+struct SchemaField {
+  std::string name;
+  ArrowType type = NANOARROW_TYPE_UNINITIALIZED;
+  bool nullable = true;
+
+  SchemaField(std::string name, ArrowType type, bool nullable)
+      : name(std::move(name)), type(type), nullable(nullable) {}
+
+  SchemaField(std::string name, ArrowType type)
+      : SchemaField(std::move(name), type, /*nullable=*/true) {}
+};
+
+/// \brief Make a schema from a vector of (name, type, nullable) tuples.
+int MakeSchema(struct ArrowSchema* schema, const std::vector<SchemaField>& fields);
+
+/// \brief Make an array from a column of C types.
+template <typename T>
+int MakeArray(struct ArrowArray* parent, struct ArrowArray* array,
+              const std::vector<std::optional<T>>& values) {
+  for (const auto& v : values) {
+    if (v.has_value()) {
+      if constexpr (std::is_same<T, int64_t>::value) {
+        if (int errno_res = ArrowArrayAppendInt(array, *v); errno_res != 0) {
+          return errno_res;
+        }
+      } else if constexpr (std::is_same<T, double>::value) {
+        if (int errno_res = ArrowArrayAppendDouble(array, *v); errno_res != 0) {
+          return errno_res;
+        }
+      } else if constexpr (std::is_same<T, std::string>::value) {
+        struct ArrowStringView view;
+        view.data = v->c_str();
+        view.n_bytes = v->size();
+        if (int errno_res = ArrowArrayAppendString(array, view); errno_res != 0) {
+          return errno_res;
+        }
+      } else {
+        static_assert(!sizeof(T), "Not yet implemented");
+        return ENOTSUP;
+      }
+    } else {
+      if (int errno_res = ArrowArrayAppendNull(array, 1); errno_res != 0) {
+        return errno_res;
+      }
+    }
+  }
+  return 0;
+}
+
+template <typename First>
+int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
+                  const std::vector<std::optional<First>>& first) {
+  return MakeArray<First>(batch, batch->children[i], first);
+}
+
+template <typename First, typename... Rest>
+int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
+                  const std::vector<std::optional<First>>& first,
+                  const std::vector<std::optional<Rest>>&... rest) {
+  if (int errno_res = MakeArray<First>(batch, batch->children[i], first);
+      errno_res != 0) {
+    return errno_res;
+  }
+  return MakeBatchImpl(batch, i + 1, error, rest...);
+}
+
+/// \brief Make a batch from columns of C types.
+template <typename... T>
+int MakeBatch(struct ArrowArray* batch, struct ArrowError* error,
+              const std::vector<std::optional<T>>&... columns) {
+  if (int errno_res = ArrowArrayStartAppending(batch); errno_res != 0) {
+    return errno_res;
+  }
+  if (int errno_res = MakeBatchImpl(batch, 0, error, columns...); errno_res != 0) {
+    return errno_res;
+  }
+  for (size_t i = 0; i < static_cast<size_t>(batch->n_children); i++) {
+    if (batch->length > 0 && batch->children[i]->length != batch->length) {
+      ADD_FAILURE() << "Column lengths are inconsistent: column " << i << " has length "
+                    << batch->children[i]->length;
+      return EINVAL;
+    }
+    batch->length = batch->children[i]->length;
+  }
+  return ArrowArrayFinishBuilding(batch, error);
+}
+
+template <typename... T>
+int MakeBatch(struct ArrowSchema* schema, struct ArrowArray* batch,
+              struct ArrowError* error, const std::vector<std::optional<T>>&... columns) {
+  if (int errno_res = ArrowArrayInitFromSchema(batch, schema, error); errno_res != 0) {
+    return errno_res;
+  }
+  return MakeBatch(batch, error, columns...);
+}
+
+/// \brief Make a stream from a list of batches.
+void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
+                std::vector<struct ArrowArray> batches);
+
 /// \brief Compare an array for equality against a vector of values.
 template <typename T>
 void CompareArray(struct ArrowArrayView* array,
diff --git a/c/vendor/nanoarrow/nanoarrow.c b/c/vendor/nanoarrow/nanoarrow.c
index d73322d..ce8e70f 100644
--- a/c/vendor/nanoarrow/nanoarrow.c
+++ b/c/vendor/nanoarrow/nanoarrow.c
@@ -622,6 +622,8 @@ int ArrowSchemaDeepCopy(struct ArrowSchema* schema, struct ArrowSchema* schema_o
     return result;
   }
 
+  schema_out->flags = schema->flags;
+
   result = ArrowSchemaSetName(schema_out, schema->name);
   if (result != NANOARROW_OK) {
     schema_out->release(schema_out);
@@ -1719,13 +1721,13 @@ static ArrowErrorCode ArrowArraySetStorageType(struct ArrowArray* array,
 
     case NANOARROW_TYPE_FIXED_SIZE_LIST:
     case NANOARROW_TYPE_STRUCT:
-    case NANOARROW_TYPE_MAP:
     case NANOARROW_TYPE_SPARSE_UNION:
       array->n_buffers = 1;
       break;
 
     case NANOARROW_TYPE_LIST:
     case NANOARROW_TYPE_LARGE_LIST:
+    case NANOARROW_TYPE_MAP:
     case NANOARROW_TYPE_BOOL:
     case NANOARROW_TYPE_UINT8:
     case NANOARROW_TYPE_INT8:
@@ -2293,9 +2295,12 @@ ArrowErrorCode ArrowArrayViewSetArray(struct ArrowArrayView* array_view,
       }
       break;
     case NANOARROW_TYPE_LIST:
+    case NANOARROW_TYPE_MAP: {
+      const char* type_name =
+          array_view->storage_type == NANOARROW_TYPE_LIST ? "list" : "map";
       if (array->n_children != 1) {
-        ArrowErrorSet(error, "Expected 1 child of list array but found %d child arrays",
-                      (int)array->n_children);
+        ArrowErrorSet(error, "Expected 1 child of %s array but found %d child arrays",
+                      type_name, (int)array->n_children);
         return EINVAL;
       }
 
@@ -2305,13 +2310,14 @@ ArrowErrorCode ArrowArrayViewSetArray(struct ArrowArrayView* array_view,
         if (array->children[0]->length < last_offset) {
           ArrowErrorSet(
               error,
-              "Expected child of list array with length >= %ld but found array with "
+              "Expected child of %s array with length >= %ld but found array with "
               "length %ld",
-              (long)last_offset, (long)array->children[0]->length);
+              type_name, (long)last_offset, (long)array->children[0]->length);
           return EINVAL;
         }
       }
       break;
+    }
     case NANOARROW_TYPE_LARGE_LIST:
       if (array->n_children != 1) {
         ArrowErrorSet(error,
diff --git a/c/vendor/nanoarrow/nanoarrow.h b/c/vendor/nanoarrow/nanoarrow.h
index b53ac44..83e66be 100644
--- a/c/vendor/nanoarrow/nanoarrow.h
+++ b/c/vendor/nanoarrow/nanoarrow.h
@@ -20,7 +20,7 @@
 
 // #define NANOARROW_NAMESPACE YourNamespaceHere
 
-#define NANOARROW_BUILD_ID "ghaa66afcc5a9faf48fe7062eb2a025d808ccfac5dd"
+#define NANOARROW_BUILD_ID "gha3d9bcca1e1effe1590437e8f8d9a77fef2ac4d59"
 
 #endif
 // Licensed to the Apache Software Foundation (ASF) under one
@@ -2171,8 +2171,7 @@ static inline ArrowErrorCode ArrowArrayAppendDouble(struct ArrowArray* array,
       NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_buffer, &value, sizeof(double)));
       break;
     case NANOARROW_TYPE_FLOAT:
-      _NANOARROW_CHECK_RANGE(value, FLT_MIN, FLT_MAX);
-      NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFloat(data_buffer, value));
+      NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFloat(data_buffer, (float)value));
       break;
     default:
       return EINVAL;
@@ -2268,6 +2267,7 @@ static inline ArrowErrorCode ArrowArrayFinishElement(struct ArrowArray* array) {
 
   switch (private_data->storage_type) {
     case NANOARROW_TYPE_LIST:
+    case NANOARROW_TYPE_MAP:
       child_length = array->children[0]->length;
       if (child_length > INT32_MAX) {
         return EINVAL;
diff --git a/c/vendor/nanoarrow/nanoarrow.hpp b/c/vendor/nanoarrow/nanoarrow.hpp
new file mode 100644
index 0000000..992253a
--- /dev/null
+++ b/c/vendor/nanoarrow/nanoarrow.hpp
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "nanoarrow.h"
+
+#ifndef NANOARROW_HPP_INCLUDED
+#define NANOARROW_HPP_INCLUDED
+
+/// \defgroup nanoarrow_hpp Nanoarrow C++ Helpers
+///
+/// The utilities provided in this file are intended to support C++ users
+/// of the nanoarrow C library such that C++-style resource allocation
+/// and error handling can be used with nanoarrow data structures.
+/// These utilities are not intended to mirror the nanoarrow C API.
+
+namespace nanoarrow {
+
+namespace internal {
+
+/// \defgroup nanoarrow_hpp-unique_base Base classes for Unique wrappers
+///
+/// @{
+
+static inline void init_pointer(struct ArrowSchema* data) { data->release = nullptr; }
+
+static inline void move_pointer(struct ArrowSchema* src, struct ArrowSchema* dst) {
+  memcpy(dst, src, sizeof(struct ArrowSchema));
+  src->release = nullptr;
+}
+
+static inline void release_pointer(struct ArrowSchema* data) {
+  if (data->release != nullptr) {
+    data->release(data);
+  }
+}
+
+static inline void init_pointer(struct ArrowArray* data) { data->release = nullptr; }
+
+static inline void move_pointer(struct ArrowArray* src, struct ArrowArray* dst) {
+  memcpy(dst, src, sizeof(struct ArrowArray));
+  src->release = nullptr;
+}
+
+static inline void release_pointer(struct ArrowArray* data) {
+  if (data->release != nullptr) {
+    data->release(data);
+  }
+}
+
+static inline void init_pointer(struct ArrowArrayStream* data) {
+  data->release = nullptr;
+}
+
+static inline void move_pointer(struct ArrowArrayStream* src,
+                                struct ArrowArrayStream* dst) {
+  memcpy(dst, src, sizeof(struct ArrowArrayStream));
+  src->release = nullptr;
+}
+
+static inline void release_pointer(ArrowArrayStream* data) {
+  if (data->release != nullptr) {
+    data->release(data);
+  }
+}
+
+static inline void init_pointer(struct ArrowBuffer* data) { ArrowBufferInit(data); }
+
+static inline void move_pointer(struct ArrowBuffer* src, struct ArrowBuffer* dst) {
+  ArrowBufferMove(src, dst);
+}
+
+static inline void release_pointer(struct ArrowBuffer* data) { ArrowBufferReset(data); }
+
+static inline void init_pointer(struct ArrowBitmap* data) { ArrowBitmapInit(data); }
+
+static inline void move_pointer(struct ArrowBitmap* src, struct ArrowBitmap* dst) {
+  ArrowBufferMove(&src->buffer, &dst->buffer);
+  dst->size_bits = src->size_bits;
+  src->size_bits = 0;
+}
+
+static inline void release_pointer(struct ArrowBitmap* data) { ArrowBitmapReset(data); }
+
+static inline void init_pointer(struct ArrowArrayView* data) {
+  ArrowArrayViewInit(data, NANOARROW_TYPE_UNINITIALIZED);
+}
+
+static inline void move_pointer(struct ArrowArrayView* src, struct ArrowArrayView* dst) {
+  memcpy(dst, src, sizeof(struct ArrowArrayView));
+  init_pointer(src);
+}
+
+static inline void release_pointer(struct ArrowArrayView* data) {
+  ArrowArrayViewReset(data);
+}
+
+/// \brief A unique_ptr-like base class for stack-allocatable objects
+/// \tparam T The object type
+template <typename T>
+class Unique {
+ public:
+  /// \brief Construct an invalid instance of T holding no resources
+  Unique() { init_pointer(&data_); }
+
+  /// \brief Move and take ownership of data
+  Unique(T* data) { move_pointer(data, &data_); }
+
+  /// \brief Move and take ownership of data wrapped by rhs
+  Unique(Unique&& rhs) : Unique(rhs.get()) {}
+
+  // These objects are not copyable
+  Unique(Unique& rhs) = delete;
+
+  /// \brief Get a pointer to the data owned by this object
+  T* get() noexcept { return &data_; }
+
+  /// \brief Use the pointer operator to access fields of this object
+  T* operator->() { return &data_; }
+
+  /// \brief Call data's release callback if valid
+  void reset() { release_pointer(&data_); }
+
+  /// \brief Call data's release callback if valid and move ownership of the data
+  /// pointed to by data
+  void reset(T* data) {
+    reset();
+    move_pointer(data, &data_);
+  }
+
+  /// \brief Move ownership of this object to the data pointed to by out
+  void move(T* out) { move_pointer(&data_, out); }
+
+  ~Unique() { reset(); }
+
+ protected:
+  T data_;
+};
+
+/// @}
+
+}  // namespace internal
+
+/// \defgroup nanoarrow_hpp-unique Unique object wrappers
+///
+/// The Arrow C Data interface, the Arrow C Stream interface, and the
+/// nanoarrow C library use stack-allocatable objects, some of which
+/// require initialization or cleanup.
+///
+/// @{
+
+/// \brief Class wrapping a unique struct ArrowSchema
+using UniqueSchema = internal::Unique<struct ArrowSchema>;
+
+/// \brief Class wrapping a unique struct ArrowArray
+using UniqueArray = internal::Unique<struct ArrowArray>;
+
+/// \brief Class wrapping a unique struct ArrowArrayStream
+using UniqueArrayStream = internal::Unique<struct ArrowArrayStream>;
+
+/// \brief Class wrapping a unique struct ArrowBuffer
+using UniqueBuffer = internal::Unique<struct ArrowBuffer>;
+
+/// \brief Class wrapping a unique struct ArrowBitmap
+using UniqueBitmap = internal::Unique<struct ArrowBitmap>;
+
+/// \brief Class wrapping a unique struct ArrowArrayView
+using UniqueArrayView = internal::Unique<struct ArrowArrayView>;
+
+/// @}
+
+/// \defgroup nanoarrow_hpp-array-stream ArrayStream helpers
+///
+/// These classes provide simple struct ArrowArrayStream implementations that
+/// can be extended to help simplify the process of creating a valid
+/// ArrowArrayStream implementation or used as-is for testing.
+///
+/// @{
+
+/// \brief An empty array stream
+///
+/// This class can be constructed from an enum ArrowType or
+/// struct ArrowSchema and implements a default get_next() method that
+/// always marks the output ArrowArray as released. This class can
+/// be extended with an implementation of get_next() for a custom
+/// source.
+class EmptyArrayStream {
+ public:
+  /// \brief Create an empty UniqueArrayStream from a struct ArrowSchema
+  ///
+  /// This object takes ownership of the schema and marks the source schema
+  /// as released.
+  static UniqueArrayStream MakeUnique(struct ArrowSchema* schema) {
+    UniqueArrayStream stream;
+    (new EmptyArrayStream(schema))->MakeStream(stream.get());
+    return stream;
+  }
+
+  virtual ~EmptyArrayStream() {}
+
+ protected:
+  UniqueSchema schema_;
+  struct ArrowError error_;
+
+  EmptyArrayStream(struct ArrowSchema* schema) : schema_(schema) {
+    error_.message[0] = '\0';
+  }
+
+  void MakeStream(struct ArrowArrayStream* stream) {
+    stream->get_schema = &get_schema_wrapper;
+    stream->get_next = &get_next_wrapper;
+    stream->get_last_error = &get_last_error_wrapper;
+    stream->release = &release_wrapper;
+    stream->private_data = this;
+  }
+
+  virtual int get_schema(struct ArrowSchema* schema) {
+    return ArrowSchemaDeepCopy(schema_.get(), schema);
+  }
+
+  virtual int get_next(struct ArrowArray* array) {
+    array->release = nullptr;
+    return NANOARROW_OK;
+  }
+
+  virtual const char* get_last_error() { return error_.message; }
+
+ private:
+  static int get_schema_wrapper(struct ArrowArrayStream* stream,
+                                struct ArrowSchema* schema) {
+    return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_schema(schema);
+  }
+
+  static int get_next_wrapper(struct ArrowArrayStream* stream, struct ArrowArray* array) {
+    return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_next(array);
+  }
+
+  static const char* get_last_error_wrapper(struct ArrowArrayStream* stream) {
+    return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_last_error();
+  }
+
+  static void release_wrapper(struct ArrowArrayStream* stream) {
+    delete reinterpret_cast<EmptyArrayStream*>(stream->private_data);
+  }
+};
+
+/// \brief Implementation of an ArrowArrayStream backed by a vector of ArrowArray objects
+class VectorArrayStream : public EmptyArrayStream {
+ public:
+  /// \brief Create a UniqueArrowArrayStream from an existing array
+  ///
+  /// Takes ownership of the schema and the array.
+  static UniqueArrayStream MakeUnique(struct ArrowSchema* schema,
+                                      struct ArrowArray* array) {
+    std::vector<UniqueArray> arrays;
+    arrays.emplace_back(array);
+    return MakeUnique(schema, std::move(arrays));
+  }
+
+  /// \brief Create a UniqueArrowArrayStream from existing arrays
+  ///
+  /// This object takes ownership of the schema and arrays.
+  static UniqueArrayStream MakeUnique(struct ArrowSchema* schema,
+                                      std::vector<UniqueArray> arrays) {
+    UniqueArrayStream stream;
+    (new VectorArrayStream(schema, std::move(arrays)))->MakeStream(stream.get());
+    return stream;
+  }
+
+ protected:
+  VectorArrayStream(struct ArrowSchema* schema, std::vector<UniqueArray> arrays)
+      : EmptyArrayStream(schema), offset_(0), arrays_(std::move(arrays)) {}
+
+  int get_next(struct ArrowArray* array) {
+    if (offset_ < arrays_.size()) {
+      arrays_[offset_++].move(array);
+    } else {
+      array->release = nullptr;
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<UniqueArray> arrays_;
+  int64_t offset_;
+};
+
+/// @}
+
+}  // namespace nanoarrow
+
+#endif
diff --git a/c/vendor/vendor_nanoarrow.sh b/c/vendor/vendor_nanoarrow.sh
index bc50743..b5d65c7 100755
--- a/c/vendor/vendor_nanoarrow.sh
+++ b/c/vendor/vendor_nanoarrow.sh
@@ -34,6 +34,7 @@ main() {
 
     cp "$SCRATCH/dist/nanoarrow.c" nanoarrow/
     cp "$SCRATCH/dist/nanoarrow.h" nanoarrow/
+    cp "$SCRATCH/src/nanoarrow/nanoarrow.hpp" nanoarrow/
 }
 
 main "$@"
diff --git a/ci/conda_env_cpp.txt b/ci/conda_env_cpp.txt
index cfdf3ec..d45faeb 100644
--- a/ci/conda_env_cpp.txt
+++ b/ci/conda_env_cpp.txt
@@ -15,7 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-arrow-cpp>=8.0.0
 cmake
 compilers
 gmock>=1.10.0
diff --git a/glib/test/test-connection.rb b/glib/test/test-connection.rb
index bc8c488..02c0086 100644
--- a/glib/test/test-connection.rb
+++ b/glib/test/test-connection.rb
@@ -19,7 +19,7 @@ class ConnectionTest < Test::Unit::TestCase
   def setup
     @database = ADBC::Database.new
     @database.set_option("driver", "adbc_driver_sqlite")
-    @database.set_option("filename", ":memory:")
+    @database.set_option("uri", ":memory:")
     @database.init
   end
 
diff --git a/glib/test/test-statement.rb b/glib/test/test-statement.rb
index 9c0af1d..2a3f11c 100644
--- a/glib/test/test-statement.rb
+++ b/glib/test/test-statement.rb
@@ -19,7 +19,7 @@ class StatementTest < Test::Unit::TestCase
   def setup
     @database = ADBC::Database.new
     @database.set_option("driver", "adbc_driver_sqlite")
-    @database.set_option("filename", ":memory:")
+    @database.set_option("uri", ":memory:")
     @database.init
     @connection = ADBC::Connection.new
     @connection.init(@database)
diff --git a/go/adbc/drivermgr/wrapper.go b/go/adbc/drivermgr/wrapper.go
index 264ee44..3b60696 100644
--- a/go/adbc/drivermgr/wrapper.go
+++ b/go/adbc/drivermgr/wrapper.go
@@ -249,7 +249,7 @@ func (s *stmt) ExecuteQuery(context.Context) (array.RecordReader, int64, error)
 	)
 	code := adbc.Status(C.AdbcStatementExecuteQuery(s.st, &out, &affected, &err))
 	if code != adbc.StatusOK {
-		return nil, 0, nil
+		return nil, 0, toAdbcError(code, &err)
 	}
 
 	return getRdr(&out), int64(affected), nil
diff --git a/go/adbc/drivermgr/wrapper_sqlite_test.go b/go/adbc/drivermgr/wrapper_sqlite_test.go
index edad49f..ba3b277 100644
--- a/go/adbc/drivermgr/wrapper_sqlite_test.go
+++ b/go/adbc/drivermgr/wrapper_sqlite_test.go
@@ -76,10 +76,7 @@ func (dm *DriverMgrSuite) TestMetadataGetInfo() {
 
 	rdr, err := dm.conn.GetInfo(dm.ctx, nil)
 	dm.NoError(err)
-	dm.True(rdr.Next())
 	dm.True(infoSchema.Equal(rdr.Schema()))
-	dm.NotNil(rdr.Record())
-	dm.False(rdr.Next())
 	rdr.Release()
 
 	info := []adbc.InfoCode{
@@ -91,12 +88,11 @@ func (dm *DriverMgrSuite) TestMetadataGetInfo() {
 
 	rdr, err = dm.conn.GetInfo(dm.ctx, info)
 	dm.NoError(err)
-	dm.True(rdr.Next())
 	sc := rdr.Schema()
 	dm.True(infoSchema.Equal(sc))
-	dm.EqualValues(4, rdr.Record().NumRows())
-	dm.False(rdr.Next())
 	rdr.Release()
+
+	// TODO(apache/arrow-nanoarrow#76): values are not checked because go fails to import the union values
 }
 
 func (dm *DriverMgrSuite) TestSqlExecute() {
@@ -127,12 +123,16 @@ func (dm *DriverMgrSuite) TestSqlExecuteInvalid() {
 	dm.Require().NoError(err)
 	defer st.Close()
 
-	err = st.SetSqlQuery(query)
+	dm.Require().NoError(st.SetSqlQuery(query))
+
+	_, _, err = st.ExecuteQuery(dm.ctx)
+	dm.Require().Error(err)
+
 	var adbcErr *adbc.Error
 	dm.ErrorAs(err, &adbcErr)
-	dm.ErrorContains(adbcErr, "[SQLite3] sqlite3_prepare_v2:")
+	dm.ErrorContains(adbcErr, "[SQLite] Failed to prepare query:")
 	dm.ErrorContains(adbcErr, "syntax error")
-	dm.Equal(adbc.StatusIO, adbcErr.Code)
+	dm.Equal(adbc.StatusInvalidArgument, adbcErr.Code)
 }
 
 func (dm *DriverMgrSuite) TestSqlPrepare() {
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index 14b0245..51be26c 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -64,9 +64,9 @@ _KNOWN_INFO_VALUES = {
     0: "vendor_name",
     1: "vendor_version",
     2: "vendor_arrow_version",
-    100: "vendor_name",
-    101: "vendor_version",
-    102: "vendor_arrow_version",
+    100: "driver_name",
+    101: "driver_version",
+    102: "driver_arrow_version",
 }
 
 # ----------------------------------------------------------
diff --git a/python/adbc_driver_manager/tests/test_dbapi.py b/python/adbc_driver_manager/tests/test_dbapi.py
index d3d34af..7c23588 100644
--- a/python/adbc_driver_manager/tests/test_dbapi.py
+++ b/python/adbc_driver_manager/tests/test_dbapi.py
@@ -64,11 +64,16 @@ def test_attrs(sqlite):
 
 @pytest.mark.sqlite
 def test_info(sqlite):
-    assert sqlite.adbc_get_info() == {
-        "vendor_arrow_version": "Arrow/C++ 9.0.0",
-        "vendor_name": "ADBC C SQLite3",
-        "vendor_version": "0.0.1",
+    info = sqlite.adbc_get_info()
+    assert set(info.keys()) == {
+        "driver_arrow_version",
+        "driver_name",
+        "driver_version",
+        "vendor_name",
+        "vendor_version",
     }
+    assert info["driver_name"] == "ADBC SQLite Driver"
+    assert info["vendor_name"] == "SQLite"
 
 
 @pytest.mark.sqlite
@@ -144,7 +149,7 @@ def test_ingest(data, sqlite):
     with sqlite.cursor() as cur:
         cur.adbc_ingest("bulk_ingest", data())
 
-        with pytest.raises(dbapi.ProgrammingError):
+        with pytest.raises(dbapi.Error):
             cur.adbc_ingest("bulk_ingest", data())
 
         cur.adbc_ingest("bulk_ingest", data(), mode="append")
diff --git a/python/adbc_driver_manager/tests/test_lowlevel.py b/python/adbc_driver_manager/tests/test_lowlevel.py
index 03f71fd..c8f9de7 100644
--- a/python/adbc_driver_manager/tests/test_lowlevel.py
+++ b/python/adbc_driver_manager/tests/test_lowlevel.py
@@ -225,9 +225,9 @@ def test_autocommit(sqlite):
 
     # Data should not be readable
     with adbc_driver_manager.AdbcStatement(conn) as stmt:
-        with pytest.raises(adbc_driver_manager.OperationalError):
+        with pytest.raises(adbc_driver_manager.Error):
             stmt.set_sql_query("SELECT * FROM foo")
-            stmt.execute()
+            stmt.execute_query()
 
     with adbc_driver_manager.AdbcStatement(conn) as stmt:
         stmt.set_options(**{adbc_driver_manager.INGEST_OPTION_TARGET_TABLE: "foo"})
diff --git a/ruby/README.md b/ruby/README.md
index b49c516..cbdf5f4 100644
--- a/ruby/README.md
+++ b/ruby/README.md
@@ -33,7 +33,7 @@ $ gem install red-adbc
 require "adbc"
 
 ADBC::Database.open(driver: "adbc_driver_sqlite",
-                    filename: ":memory:") do |database|
+                    uri: ":memory:") do |database|
   database.connect do |connection|
     puts(connection.query("SELECT 1"))
   end
diff --git a/ruby/test/test-connection.rb b/ruby/test/test-connection.rb
index 2b62c6e..e76a897 100644
--- a/ruby/test/test-connection.rb
+++ b/ruby/test/test-connection.rb
@@ -19,7 +19,7 @@ class ConnectionTest < Test::Unit::TestCase
   def setup
     options = {
       driver: "adbc_driver_sqlite",
-      filename: ":memory:",
+      uri: ":memory:",
     }
     ADBC::Database.open(**options) do |database|
       connect_options = {}