You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/11/28 22:06:16 UTC
[arrow-adbc] branch main updated: feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 3b11e33 feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)
3b11e33 is described below
commit 3b11e33096de8c50f72762139c4258b9884de325
Author: David Li <li...@gmail.com>
AuthorDate: Mon Nov 28 17:06:10 2022 -0500
feat(c/driver/sqlite): port SQLite driver to nanoarrow (#196)
Also related: #144.
Fixes #120.
Co-authored-by: Dewey Dunnington <de...@fishandwhistle.net>
---
c/driver/sqlite/CMakeLists.txt | 21 +-
c/driver/sqlite/sqlite.c | 1788 +++++++++++++++++++
c/driver/sqlite/sqlite.cc | 1827 --------------------
c/driver/sqlite/sqlite_test.cc | 655 ++++---
c/driver/sqlite/statement_reader.c | 896 ++++++++++
c/driver/sqlite/statement_reader.h | 69 +
c/driver/sqlite/types.h | 58 +
c/driver/sqlite/utils.c | 146 ++
c/driver/sqlite/utils.h | 88 +
c/driver_manager/adbc_driver_manager.cc | 11 +-
c/driver_manager/adbc_driver_manager_test.cc | 10 +-
c/validation/adbc_validation.cc | 295 +---
c/validation/adbc_validation_util.cc | 89 +
c/validation/adbc_validation_util.h | 101 ++
c/vendor/nanoarrow/nanoarrow.c | 16 +-
c/vendor/nanoarrow/nanoarrow.h | 6 +-
c/vendor/nanoarrow/nanoarrow.hpp | 308 ++++
c/vendor/vendor_nanoarrow.sh | 1 +
ci/conda_env_cpp.txt | 1 -
glib/test/test-connection.rb | 2 +-
glib/test/test-statement.rb | 2 +-
go/adbc/drivermgr/wrapper.go | 2 +-
go/adbc/drivermgr/wrapper_sqlite_test.go | 18 +-
.../adbc_driver_manager/dbapi.py | 6 +-
python/adbc_driver_manager/tests/test_dbapi.py | 15 +-
python/adbc_driver_manager/tests/test_lowlevel.py | 4 +-
ruby/README.md | 2 +-
ruby/test/test-connection.rb | 2 +-
28 files changed, 4128 insertions(+), 2311 deletions(-)
diff --git a/c/driver/sqlite/CMakeLists.txt b/c/driver/sqlite/CMakeLists.txt
index ce7b0d8..820d14e 100644
--- a/c/driver/sqlite/CMakeLists.txt
+++ b/c/driver/sqlite/CMakeLists.txt
@@ -15,39 +15,39 @@
# specific language governing permissions and limitations
# under the License.
-cmake_minimum_required(VERSION 3.14)
+cmake_minimum_required(VERSION 3.18)
get_filename_component(REPOSITORY_ROOT "../../../" ABSOLUTE)
list(APPEND CMAKE_MODULE_PATH "${REPOSITORY_ROOT}/c/cmake_modules/")
include(AdbcDefines)
include(BuildUtils)
-set(CMAKE_CXX_STANDARD 11)
-set(CMAKE_CXX_STANDARD_REQUIRED ON)
-
project(adbc_driver_sqlite
VERSION "${ADBC_BASE_VERSION}"
LANGUAGES CXX)
include(CTest)
+find_package(PkgConfig)
-find_package(Arrow REQUIRED)
find_package(SQLite3 REQUIRED)
add_arrow_lib(adbc_driver_sqlite
SOURCES
- sqlite.cc
+ sqlite.c
+ statement_reader.c
+ utils.c
OUTPUTS
ADBC_LIBRARIES
SHARED_LINK_FLAGS
${ADBC_LINK_FLAGS}
SHARED_LINK_LIBS
SQLite::SQLite3
- arrow_shared
+ nanoarrow
STATIC_LINK_LIBS
SQLite::SQLite3
- arrow_static)
+ nanoarrow
+ ${LIBPQ_STATIC_LIBRARIES})
include_directories(SYSTEM ${REPOSITORY_ROOT})
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/)
-include_directories(SYSTEM ${SQLite3_INCLUDE_DIRS})
+include_directories(SYSTEM ${REPOSITORY_ROOT}/c/vendor/nanoarrow/)
foreach(LIB_TARGET ${ADBC_LIBRARIES})
target_compile_definitions(${LIB_TARGET} PRIVATE ADBC_EXPORTING)
endforeach()
@@ -70,8 +70,7 @@ if(ADBC_BUILD_TESTS)
nanoarrow
${TEST_LINK_LIBS})
target_compile_features(adbc-driver-sqlite-test PRIVATE cxx_std_17)
- target_include_directories(adbc-driver-sqlite-test SYSTEM
- PRIVATE ${REPOSITORY_ROOT}/c/vendor/nanoarrow/)
+ adbc_configure_target(adbc-driver-sqlite-test)
endif()
validate_config()
diff --git a/c/driver/sqlite/sqlite.c b/c/driver/sqlite/sqlite.c
new file mode 100644
index 0000000..233e5c1
--- /dev/null
+++ b/c/driver/sqlite/sqlite.c
@@ -0,0 +1,1788 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "adbc.h"
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdarg.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sqlite3.h>
+
+#include "nanoarrow.h"
+#include "statement_reader.h"
+#include "types.h"
+#include "utils.h"
+
+static const char kDefaultUri[] = "file:adbc_driver_sqlite?mode=memory&cache=shared";
+// The batch size for query results (and for initial type inference)
+static const char kStatementOptionBatchRows[] = "adbc.sqlite.query.batch_rows";
+static const uint32_t kSupportedInfoCodes[] = {
+ ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
+ ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
+};
+
+// Private names (to avoid conflicts when using the driver manager)
+
+#define CHECK_DB_INIT(NAME, ERROR) \
+ if (!NAME->private_data) { \
+ SetError(ERROR, "%s: database not initialized", __func__); \
+ return ADBC_STATUS_INVALID_STATE; \
+ }
+#define CHECK_CONN_INIT(NAME, ERROR) \
+ if (!NAME->private_data) { \
+ SetError(ERROR, "%s: connection not initialized", __func__); \
+ return ADBC_STATUS_INVALID_STATE; \
+ }
+#define CHECK_STMT_INIT(NAME, ERROR) \
+ if (!NAME->private_data) { \
+ SetError(ERROR, "%s: statement not initialized", __func__); \
+ return ADBC_STATUS_INVALID_STATE; \
+ }
+
+AdbcStatusCode SqliteDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
+ if (database->private_data) {
+ SetError(error, "AdbcDatabaseNew: database already allocated");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ database->private_data = malloc(sizeof(struct SqliteDatabase));
+ memset(database->private_data, 0, sizeof(struct SqliteDatabase));
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char* key,
+ const char* value, struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+ if (strcmp(key, "uri") == 0) {
+ if (db->uri) free(db->uri);
+ size_t len = strlen(value) + 1;
+ db->uri = malloc(len);
+ strncpy(db->uri, value, len);
+ return ADBC_STATUS_OK;
+ }
+ SetError(error, "Unknown database option %s=%s", key, value);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+int OpenDatabase(const char* maybe_uri, sqlite3** db, struct AdbcError* error) {
+ const char* uri = maybe_uri ? maybe_uri : kDefaultUri;
+ int rc = sqlite3_open_v2(uri, db,
+ SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+ /*zVfs=*/NULL);
+ if (rc != SQLITE_OK) {
+ if (*db) {
+ SetError(error, "Failed to open %s: %s", uri, sqlite3_errmsg(*db));
+ } else {
+ SetError(error, "Failed to open %s: failed to allocate memory", uri);
+ }
+ (void)sqlite3_close(*db);
+ *db = NULL;
+ return ADBC_STATUS_IO;
+ }
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode ExecuteQuery(struct SqliteConnection* conn, const char* query,
+ struct AdbcError* error) {
+ sqlite3_stmt* stmt = NULL;
+ int rc = sqlite3_prepare_v2(conn->conn, query, strlen(query), &stmt, /*pzTail=*/NULL);
+ while (rc != SQLITE_DONE && rc != SQLITE_ERROR) {
+ rc = sqlite3_step(stmt);
+ }
+ rc = sqlite3_finalize(stmt);
+ if (rc != SQLITE_OK && rc != SQLITE_DONE) {
+ SetError(error, "Failed to execute query \"%s\": %s", query,
+ sqlite3_errmsg(conn->conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+ if (db->db) {
+ SetError(error, "AdbcDatabaseInit: database already initialized");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ return OpenDatabase(db->uri, &db->db, error);
+}
+
+AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
+ struct AdbcError* error) {
+ CHECK_DB_INIT(database, error);
+ struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+ size_t connection_count = db->connection_count;
+ if (db->uri) free(db->uri);
+ if (db->db) {
+ if (sqlite3_close(db->db) == SQLITE_BUSY) {
+ SetError(error, "AdbcDatabaseRelease: connection is busy");
+ return ADBC_STATUS_IO;
+ }
+ }
+ free(database->private_data);
+ database->private_data = NULL;
+
+ if (connection_count > 0) {
+ SetError(error, "AdbcDatabaseRelease: %ld open connections when released",
+ connection_count);
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionNew(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (connection->private_data) {
+ SetError(error, "AdbcConnectionNew: connection already allocated");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ connection->private_data = malloc(sizeof(struct SqliteConnection));
+ memset(connection->private_data, 0, sizeof(struct SqliteConnection));
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
+ const char* key, const char* value,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ if (strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
+ if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+ if (conn->active_transaction) {
+ AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
+ if (status != ADBC_STATUS_OK) return status;
+ conn->active_transaction = 0;
+ } else {
+ // no-op
+ }
+ } else if (strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+ if (conn->active_transaction) {
+ // no-op
+ } else {
+ // begin
+ AdbcStatusCode status = ExecuteQuery(conn, "BEGIN", error);
+ if (status != ADBC_STATUS_OK) return status;
+ conn->active_transaction = 1;
+ }
+ } else {
+ SetError(error, "Invalid connection option value %s=%s", key, value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ return ADBC_STATUS_OK;
+ }
+ SetError(error, "Unknown connection option %s=%s", key, value);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
+ struct AdbcDatabase* database,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ CHECK_DB_INIT(database, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
+
+ if (conn->conn) {
+ SetError(error, "AdbcConnectionInit: connection already initialized");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ return OpenDatabase(db->uri, &conn->conn, error);
+}
+
+AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+ if (conn->conn) {
+ int rc = sqlite3_close(conn->conn);
+ if (rc == SQLITE_BUSY) {
+ SetError(error, "AdbcConnectionRelease: connection is busy");
+ return ADBC_STATUS_IO;
+ }
+ }
+ free(connection->private_data);
+ connection->private_data = NULL;
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetInfoAppendStringImpl(struct ArrowArray* array,
+ uint32_t info_code,
+ const char* info_value,
+ struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendUInt(array->children[0], info_code), error);
+ // Append to type variant
+ struct ArrowStringView value = ArrowCharView(info_value);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[1]->children[0], value),
+ error);
+ // Append type code (by hand)
+ struct ArrowBuffer* codes = ArrowArrayBuffer(array->children[1], 0);
+ CHECK_NA(INTERNAL, ArrowBufferAppendInt8(codes, 0), error);
+ // Append type offset (by hand)
+ struct ArrowBuffer* offsets = ArrowArrayBuffer(array->children[1], 1);
+ CHECK_NA(INTERNAL,
+ ArrowBufferAppendInt32(offsets, array->children[1]->children[0]->length - 1),
+ error);
+ array->children[1]->length++;
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetInfoImpl(const uint32_t* info_codes,
+ size_t info_codes_length,
+ struct ArrowSchema* schema,
+ struct ArrowArray* array,
+ struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/2), error);
+
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_UINT32), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "info_name"), error);
+ schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+
+ struct ArrowSchema* info_value = schema->children[1];
+ // TODO(apache/arrow-nanoarrow#73): formal union support
+ // initialize with dummy then override
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value, NANOARROW_TYPE_UINT32), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetFormat(info_value, "+ud:0,1,2,3,4,5"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value, "info_value"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(info_value, /*num_columns=*/6), error);
+
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[0], "string_value"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[1], NANOARROW_TYPE_BOOL),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[1], "bool_value"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[2], NANOARROW_TYPE_INT64),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[2], "int64_value"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[3], NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[3], "int32_bitmask"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[4], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[4], "string_list"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(info_value->children[5], NANOARROW_TYPE_MAP), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(info_value->children[5], "int32_to_int32_list_map"), error);
+
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(info_value->children[4], /*num_columns=*/1),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(info_value->children[4]->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[4]->children[0], "item"),
+ error);
+
+ // XXX: nanoarrow could possibly use helpers for nested types like this
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(info_value->children[5], /*num_columns=*/1),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(info_value->children[5]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(info_value->children[5]->children[0], "entries"),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(info_value->children[5]->children[0],
+ /*num_columns=*/2),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(info_value->children[5]->children[0]->children[0],
+ NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(info_value->children[5]->children[0]->children[0], "key"),
+ error);
+ info_value->children[5]->children[0]->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(info_value->children[5]->children[0]->children[1],
+ NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(info_value->children[5]->children[0]->children[1], "item"),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(info_value->children[5]->children[0]->children[1],
+ /*num_columns=*/1),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(info_value->children[5]->children[0]->children[1]->children[0],
+ NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(
+ info_value->children[5]->children[0]->children[1]->children[0], "item"),
+ error);
+
+ struct ArrowError na_error = {0};
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, &na_error), &na_error,
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+ for (size_t i = 0; i < info_codes_length; i++) {
+ switch (info_codes[i]) {
+ case ADBC_INFO_VENDOR_NAME:
+ RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i], "SQLite",
+ error));
+ break;
+ case ADBC_INFO_VENDOR_VERSION:
+ RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+ sqlite3_libversion(), error));
+ break;
+ case ADBC_INFO_DRIVER_NAME:
+ RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+ "ADBC SQLite Driver", error));
+ break;
+ case ADBC_INFO_DRIVER_VERSION:
+ // TODO(lidavidm): fill in driver version
+ RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+ "(unknown)", error));
+ break;
+ case ADBC_INFO_DRIVER_ARROW_VERSION:
+ RAISE_ADBC(SqliteConnectionGetInfoAppendStringImpl(array, info_codes[i],
+ NANOARROW_BUILD_ID, error));
+ break;
+ default:
+ // Ignore
+ continue;
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+ }
+
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuilding(array, &na_error), &na_error, error);
+
+ return ADBC_STATUS_OK;
+} // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteConnectionGetInfo(struct AdbcConnection* connection,
+ uint32_t* info_codes, size_t info_codes_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+
+ // XXX: mistake in adbc.h (should have been const pointer)
+ const uint32_t* codes = info_codes;
+ if (!info_codes) {
+ codes = kSupportedInfoCodes;
+ info_codes_length = sizeof(kSupportedInfoCodes) / sizeof(kSupportedInfoCodes[0]);
+ }
+
+ struct ArrowSchema schema = {0};
+ struct ArrowArray array = {0};
+
+ AdbcStatusCode status =
+ SqliteConnectionGetInfoImpl(codes, info_codes_length, &schema, &array, error);
+ if (status != ADBC_STATUS_OK) {
+ if (schema.release) schema.release(&schema);
+ if (array.release) array.release(&array);
+ return status;
+ }
+
+ return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionGetObjectsSchema(struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/2), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_STRING), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "catalog_name"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[1], NANOARROW_TYPE_LIST), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[1], "catalog_db_schemas"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema->children[1], 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(schema->children[1]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[1]->children[0], "item"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema->children[1]->children[0], 2),
+ error);
+
+ struct ArrowSchema* db_schema_schema = schema->children[1]->children[0];
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(db_schema_schema->children[0], NANOARROW_TYPE_STRING), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(db_schema_schema->children[0], "db_schema_name"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(db_schema_schema->children[1], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(db_schema_schema->children[1], "db_schema_tables"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(db_schema_schema->children[1], 1),
+ error);
+ CHECK_NA(
+ INTERNAL,
+ ArrowSchemaInit(db_schema_schema->children[1]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(db_schema_schema->children[1]->children[0], "item"), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(db_schema_schema->children[1]->children[0], 4),
+ error);
+
+ struct ArrowSchema* table_schema = db_schema_schema->children[1]->children[0];
+ CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[0], "table_name"), error);
+ table_schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[1], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[1], "table_type"), error);
+ table_schema->children[1]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[2], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[2], "table_columns"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(table_schema->children[2], 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(table_schema->children[2]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[2]->children[0], "item"),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(table_schema->children[2]->children[0], 19),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(table_schema->children[3], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[3], "table_constraints"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(table_schema->children[3], 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(table_schema->children[3]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(table_schema->children[3]->children[0], "item"),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(table_schema->children[3]->children[0], 4), error);
+
+ struct ArrowSchema* column_schema = table_schema->children[2]->children[0];
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[0], "column_name"),
+ error);
+ column_schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[1], NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[1], "ordinal_position"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[2], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[2], "remarks"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[3], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[3], "xdbc_data_type"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[4], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[4], "xdbc_type_name"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[5], NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[5], "xdbc_column_size"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[6], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[6], "xdbc_decimal_digits"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[7], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[7], "xdbc_num_prec_radix"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[8], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[8], "xdbc_nullable"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[9], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[9], "xdbc_column_def"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[10], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[10], "xdbc_sql_data_type"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[11], NANOARROW_TYPE_INT16),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[11], "xdbc_datetime_sub"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[12], NANOARROW_TYPE_INT32),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[12], "xdbc_char_octet_length"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[13], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[13], "xdbc_is_nullable"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[14], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[14], "xdbc_scope_catalog"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[15], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[15], "xdbc_scope_schema"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[16], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(column_schema->children[16], "xdbc_scope_table"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[17], NANOARROW_TYPE_BOOL),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[17], "xdbc_is_autoincrement"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(column_schema->children[18], NANOARROW_TYPE_BOOL),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(column_schema->children[18], "xdbc_is_generatedcolumn"),
+ error);
+
+ struct ArrowSchema* constraint_schema = table_schema->children[3]->children[0];
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(constraint_schema->children[0], NANOARROW_TYPE_STRING), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[0], "constraint_name"), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaInit(constraint_schema->children[1], NANOARROW_TYPE_STRING), error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[1], "constraint_type"), error);
+ constraint_schema->children[1]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(constraint_schema->children[2], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[2], "constraint_column_names"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(constraint_schema->children[2], 1),
+ error);
+ CHECK_NA(
+ INTERNAL,
+ ArrowSchemaInit(constraint_schema->children[2]->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[2]->children[0], "item"),
+ error);
+ constraint_schema->children[2]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(constraint_schema->children[3], NANOARROW_TYPE_LIST),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[3], "constraint_column_usage"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(constraint_schema->children[3], 1),
+ error);
+ CHECK_NA(
+ INTERNAL,
+ ArrowSchemaInit(constraint_schema->children[3]->children[0], NANOARROW_TYPE_STRUCT),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaSetName(constraint_schema->children[3]->children[0], "item"),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowSchemaAllocateChildren(constraint_schema->children[3]->children[0], 4),
+ error);
+
+ struct ArrowSchema* usage_schema = constraint_schema->children[3]->children[0];
+ CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[0], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[0], "fk_catalog"), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[1], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[1], "fk_db_schema"),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[2], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[2], "fk_table"), error);
+ usage_schema->children[2]->flags &= ~ARROW_FLAG_NULLABLE;
+ CHECK_NA(INTERNAL, ArrowSchemaInit(usage_schema->children[3], NANOARROW_TYPE_STRING),
+ error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(usage_schema->children[3], "fk_column_name"),
+ error);
+ usage_schema->children[3]->flags &= ~ARROW_FLAG_NULLABLE;
+
+ return ADBC_STATUS_OK;
+}
+
+static const char kTableQuery[] =
+ "SELECT name, type "
+ "FROM sqlite_master "
+ "WHERE name LIKE ? AND type <> 'index'"
+ "ORDER BY name ASC";
+static const char kColumnQuery[] =
+ "SELECT cid, name, type, \"notnull\", dflt_value "
+ "FROM pragma_table_info(?) "
+ "WHERE name LIKE ? "
+ "ORDER BY cid ASC";
+static const char kPrimaryKeyQuery[] =
+ "SELECT name "
+ "FROM pragma_table_info(?) "
+ "WHERE pk > 0 "
+ "ORDER BY pk ASC";
+static const char kForeignKeyQuery[] =
+ "SELECT id, seq, \"table\", \"from\", \"to\" "
+ "FROM pragma_foreign_key_list(?) "
+ "ORDER BY id, seq ASC";
+
+AdbcStatusCode SqliteConnectionGetColumnsImpl(
+ struct SqliteConnection* conn, const char* table_name, const char* column_name,
+ struct ArrowArray* table_columns_col, sqlite3_stmt* stmt, struct AdbcError* error) {
+ struct ArrowArray* table_columns_items = table_columns_col->children[0];
+ struct ArrowArray* column_name_col = table_columns_items->children[0];
+ struct ArrowArray* ordinal_position_col = table_columns_items->children[1];
+ struct ArrowArray* remarks_col = table_columns_items->children[2];
+ struct ArrowArray* xdbc_data_type_col = table_columns_items->children[3];
+ struct ArrowArray* xdbc_type_name_col = table_columns_items->children[4];
+ struct ArrowArray* xdbc_column_size_col = table_columns_items->children[5];
+ struct ArrowArray* xdbc_decimal_digits_col = table_columns_items->children[6];
+ struct ArrowArray* xdbc_num_prec_radix_col = table_columns_items->children[7];
+ struct ArrowArray* xdbc_nullable_col = table_columns_items->children[8];
+ struct ArrowArray* xdbc_column_def_col = table_columns_items->children[9];
+ struct ArrowArray* xdbc_sql_data_type_col = table_columns_items->children[10];
+ struct ArrowArray* xdbc_datetime_sub_col = table_columns_items->children[11];
+ struct ArrowArray* xdbc_char_octet_length_col = table_columns_items->children[12];
+ struct ArrowArray* xdbc_is_nullable_col = table_columns_items->children[13];
+ struct ArrowArray* xdbc_scope_catalog_col = table_columns_items->children[14];
+ struct ArrowArray* xdbc_scope_schema_col = table_columns_items->children[15];
+ struct ArrowArray* xdbc_scope_table_col = table_columns_items->children[16];
+ struct ArrowArray* xdbc_is_autoincrement_col = table_columns_items->children[17];
+ struct ArrowArray* xdbc_is_generatedcolumn_col = table_columns_items->children[18];
+
+ int rc = sqlite3_reset(stmt);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ rc = sqlite3_bind_text64(stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+ SQLITE_UTF8);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ if (column_name) {
+ rc = sqlite3_bind_text64(stmt, 2, column_name, strlen(column_name), SQLITE_STATIC,
+ SQLITE_UTF8);
+ } else {
+ rc = sqlite3_bind_text64(stmt, 2, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
+ }
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ const char* col_name = (const char*)sqlite3_column_text(stmt, 1);
+ struct ArrowStringView str = {.data = col_name,
+ .n_bytes = sqlite3_column_bytes(stmt, 1)};
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(column_name_col, str), error);
+
+ const int32_t col_cid = sqlite3_column_int(stmt, 0);
+ CHECK_NA(INTERNAL, ArrowArrayAppendInt(ordinal_position_col, col_cid + 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(remarks_col, 1), error);
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_data_type_col, 1), error);
+
+ const char* col_type = (const char*)sqlite3_column_text(stmt, 2);
+ if (col_type) {
+ str.data = col_type;
+ str.n_bytes = sqlite3_column_bytes(stmt, 2);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_type_name_col, str), error);
+ } else {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_type_name_col, 1), error);
+ }
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_size_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_decimal_digits_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_num_prec_radix_col, 1), error);
+
+ const int32_t col_notnull = sqlite3_column_int(stmt, 3);
+ if (col_notnull == 0) {
+ // JDBC columnNullable == 1
+ CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 1), error);
+ } else {
+ // JDBC columnNoNulls == 0
+ CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 0), error);
+ }
+
+ const char* col_def = (const char*)sqlite3_column_text(stmt, 4);
+ if (col_def) {
+ str.data = col_def;
+ str.n_bytes = sqlite3_column_bytes(stmt, 4);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_column_def_col, str), error);
+ } else {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_def_col, 1), error);
+ }
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_sql_data_type_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_datetime_sub_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_char_octet_length_col, 1), error);
+
+ if (col_notnull == 0) {
+ str.data = "YES";
+ str.n_bytes = 3;
+ } else {
+ str.data = "NO";
+ str.n_bytes = 2;
+ }
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_is_nullable_col, str), error);
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_catalog_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_schema_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_table_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_autoincrement_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_generatedcolumn_col, 1), error);
+
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_items), error);
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetConstraintsImpl(
+ struct SqliteConnection* conn, const char* table_name, const char* column_name,
+ struct ArrowArray* table_constraints_col, sqlite3_stmt* pk_stmt,
+ sqlite3_stmt* fk_stmt, struct AdbcError* error) {
+ struct ArrowArray* table_constraints_items = table_constraints_col->children[0];
+ struct ArrowArray* constraint_name_col = table_constraints_items->children[0];
+ struct ArrowArray* constraint_type_col = table_constraints_items->children[1];
+ struct ArrowArray* constraint_column_names_col = table_constraints_items->children[2];
+ struct ArrowArray* constraint_column_names_items =
+ constraint_column_names_col->children[0];
+ struct ArrowArray* constraint_column_usage_col = table_constraints_items->children[3];
+ struct ArrowArray* constraint_column_usage_items =
+ constraint_column_usage_col->children[0];
+ struct ArrowArray* fk_catalog_col = constraint_column_usage_items->children[0];
+ struct ArrowArray* fk_db_schema_col = constraint_column_usage_items->children[1];
+ struct ArrowArray* fk_table_col = constraint_column_usage_items->children[2];
+ struct ArrowArray* fk_column_name_col = constraint_column_usage_items->children[3];
+
+ // We can get primary keys and foreign keys, but not unique
+ // constraints (unless we parse the SQL table definition)
+
+ int rc = sqlite3_reset(pk_stmt);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ rc = sqlite3_bind_text64(pk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+ SQLITE_UTF8);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ char has_primary_key = 0;
+ while ((rc = sqlite3_step(pk_stmt)) == SQLITE_ROW) {
+ if (!has_primary_key) {
+ has_primary_key = 1;
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(constraint_name_col, ArrowCharView("PRIMARY KEY")),
+ error);
+ }
+ CHECK_NA(
+ INTERNAL,
+ ArrowArrayAppendString(
+ constraint_column_names_items,
+ (struct ArrowStringView){.data = (const char*)sqlite3_column_text(pk_stmt, 0),
+ .n_bytes = sqlite3_column_bytes(pk_stmt, 0)}),
+ error);
+ }
+ if (has_primary_key) {
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_column_usage_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+ }
+
+ rc = sqlite3_reset(fk_stmt);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ rc = sqlite3_bind_text64(fk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
+ SQLITE_UTF8);
+ RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
+
+ int prev_fk_id = -1;
+ while ((rc = sqlite3_step(fk_stmt)) == SQLITE_ROW) {
+ const int fk_id = sqlite3_column_int(fk_stmt, 0);
+ const int fk_seq = sqlite3_column_int(fk_stmt, 1);
+ const char* to_table = (const char*)sqlite3_column_text(fk_stmt, 2);
+ const char* from_col = (const char*)sqlite3_column_text(fk_stmt, 3);
+ const char* to_col = (const char*)sqlite3_column_text(fk_stmt, 4);
+
+ if (fk_id != prev_fk_id) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(constraint_name_col, ArrowCharView("FOREIGN KEY")),
+ error);
+
+ if (prev_fk_id != -1) {
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+ }
+ prev_fk_id = fk_id;
+
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(
+ constraint_column_names_items,
+ (struct ArrowStringView){.data = from_col,
+ .n_bytes = sqlite3_column_bytes(pk_stmt, 3)}),
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(fk_catalog_col, ArrowCharView("main")),
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(fk_db_schema_col, 1), error);
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(
+ fk_table_col,
+ (struct ArrowStringView){.data = to_table,
+ .n_bytes = sqlite3_column_bytes(pk_stmt, 2)}),
+ error);
+ CHECK_NA(INTERNAL,
+ ArrowArrayAppendString(
+ fk_column_name_col,
+ (struct ArrowStringView){.data = to_col,
+ .n_bytes = sqlite3_column_bytes(pk_stmt, 4)}),
+ error);
+ }
+ }
+ if (prev_fk_id != -1) {
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
+ }
+
+ return ADBC_STATUS_OK;
+} // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteConnectionGetTablesInner(
+ struct SqliteConnection* conn, sqlite3_stmt* tables_stmt, sqlite3_stmt* columns_stmt,
+ sqlite3_stmt* pk_stmt, sqlite3_stmt* fk_stmt, const char** table_type,
+ const char* column_name, struct ArrowArray* db_schema_tables_col,
+ struct AdbcError* error) {
+ struct ArrowArray* db_schema_tables_items = db_schema_tables_col->children[0];
+ struct ArrowArray* table_name_col = db_schema_tables_items->children[0];
+ struct ArrowArray* table_type_col = db_schema_tables_items->children[1];
+ struct ArrowArray* table_columns_col = db_schema_tables_items->children[2];
+ struct ArrowArray* table_constraints_col = db_schema_tables_items->children[3];
+
+ int rc = SQLITE_OK;
+ while ((rc = sqlite3_step(tables_stmt)) == SQLITE_ROW) {
+ const char* cur_table_type = (const char*)sqlite3_column_text(tables_stmt, 1);
+
+ if (table_type) {
+ const char** current = table_type;
+ char found = 0;
+ while (*current) {
+ if (strcmp(*current, cur_table_type) == 0) {
+ found = 1;
+ break;
+ }
+ current++;
+ }
+ if (!found) continue;
+ }
+
+ struct ArrowStringView str = {.data = cur_table_type,
+ .n_bytes = sqlite3_column_bytes(tables_stmt, 1)};
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(table_type_col, str), error);
+
+ const char* cur_table = (const char*)sqlite3_column_text(tables_stmt, 0);
+ str.data = cur_table;
+ str.n_bytes = sqlite3_column_bytes(tables_stmt, 0);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(table_name_col, str), error);
+
+ if (columns_stmt == NULL) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_columns_col, 1), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_constraints_col, 1), error);
+ } else {
+ // XXX: n + 1 query pattern. You can join on a pragma so we
+ // could avoid this in principle but it complicates the
+ // unpacking code here quite a bit, so ignore for now.
+ RAISE_ADBC(SqliteConnectionGetColumnsImpl(conn, cur_table, column_name,
+ table_columns_col, columns_stmt, error));
+ // Not strictly necessary, but we passed SQLITE_STATIC when
+ // binding so don't let the reference leak
+ (void)sqlite3_clear_bindings(columns_stmt);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_col), error);
+
+ RAISE_ADBC(SqliteConnectionGetConstraintsImpl(
+ conn, cur_table, column_name, table_constraints_col, pk_stmt, fk_stmt, error));
+ (void)sqlite3_clear_bindings(pk_stmt);
+ (void)sqlite3_clear_bindings(fk_stmt);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_col), error);
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_items), error);
+ }
+
+ if (rc != SQLITE_DONE) {
+ SetError(error, "Failed to query for tables: %s", sqlite3_errmsg(conn->conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_col), error);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetTablesImpl(struct SqliteConnection* conn, int depth,
+ const char* table_name,
+ const char** table_type,
+ const char* column_name,
+ struct ArrowArray* db_schema_tables_col,
+ struct AdbcError* error) {
+ sqlite3_stmt* tables_stmt = NULL;
+ sqlite3_stmt* columns_stmt = NULL;
+ sqlite3_stmt* pk_stmt = NULL;
+ sqlite3_stmt* fk_stmt = NULL;
+ int rc = SQLITE_OK;
+
+ if (rc == SQLITE_OK) {
+ rc = sqlite3_prepare_v2(conn->conn, kTableQuery, sizeof(kTableQuery), &tables_stmt,
+ /*pzTail=*/NULL);
+ }
+ if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+ rc = sqlite3_prepare_v2(conn->conn, kColumnQuery, sizeof(kColumnQuery), &columns_stmt,
+ /*pzTail=*/NULL);
+ }
+ if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+ rc = sqlite3_prepare_v2(conn->conn, kPrimaryKeyQuery, sizeof(kPrimaryKeyQuery),
+ &pk_stmt, /*pzTail=*/NULL);
+ }
+ if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
+ rc = sqlite3_prepare_v2(conn->conn, kForeignKeyQuery, sizeof(kForeignKeyQuery),
+ &fk_stmt, /*pzTail=*/NULL);
+ }
+ if (rc == SQLITE_OK) {
+ if (table_name) {
+ rc = sqlite3_bind_text64(tables_stmt, 1, table_name, strlen(table_name),
+ SQLITE_STATIC, SQLITE_UTF8);
+ } else {
+ rc = sqlite3_bind_text64(tables_stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
+ }
+ }
+
+ AdbcStatusCode status = ADBC_STATUS_OK;
+ if (rc == SQLITE_OK) {
+ status = SqliteConnectionGetTablesInner(conn, tables_stmt, columns_stmt, pk_stmt,
+ fk_stmt, table_type, column_name,
+ db_schema_tables_col, error);
+ } else {
+ SetError(error, "Failed to query for tables: %s", sqlite3_errmsg(conn->conn));
+ status = ADBC_STATUS_INTERNAL;
+ }
+
+ sqlite3_finalize(tables_stmt);
+ sqlite3_finalize(columns_stmt);
+ sqlite3_finalize(pk_stmt);
+ sqlite3_finalize(fk_stmt);
+ return status;
+}
+
+AdbcStatusCode SqliteConnectionGetObjectsImpl(
+ struct SqliteConnection* conn, int depth, const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_type, const char* column_name,
+ struct ArrowSchema* schema, struct ArrowArray* array, struct AdbcError* error) {
+ RAISE_ADBC(SqliteConnectionGetObjectsSchema(schema, error));
+
+ struct ArrowError na_error = {0};
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, &na_error), &na_error,
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+ struct ArrowArray* catalog_name_col = array->children[0];
+ struct ArrowArray* catalog_db_schemas_col = array->children[1];
+
+ struct ArrowArray* catalog_db_schemas_items = catalog_db_schemas_col->children[0];
+ struct ArrowArray* db_schema_name_col = catalog_db_schemas_items->children[0];
+ struct ArrowArray* db_schema_tables_col = catalog_db_schemas_items->children[1];
+
+ // TODO: support proper filters
+ if (!catalog || strcmp(catalog, "main") == 0) {
+ // Default the primary catalog to "main"
+ // https://www.sqlite.org/cli.html
+ // > The ".databases" command shows a list of all databases open
+ // > in the current connection. There will always be at least
+ // > 2. The first one is "main", the original database opened.
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(catalog_name_col, ArrowCharView("main")),
+ error);
+
+ if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col, 1), error);
+ } else if (!db_schema || db_schema == NULL) {
+ // For our purposes, we'll consider SQLite to always have a
+ // single, unnamed schema within each catalog.
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_name_col, 1), error);
+ if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
+ CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_col, 1), error);
+ } else {
+ RAISE_ADBC(SqliteConnectionGetTablesImpl(conn, depth, table_name, table_type,
+ column_name, db_schema_tables_col,
+ error));
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_items), error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
+ } else {
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
+ }
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+
+ // TODO: implement "temp", other attached databases as catalogs
+ }
+
+ CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuilding(array, &na_error), &na_error, error);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetObjects(struct AdbcConnection* connection, int depth,
+ const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_type,
+ const char* column_name,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+ struct ArrowSchema schema = {0};
+ struct ArrowArray array = {0};
+
+ AdbcStatusCode status =
+ SqliteConnectionGetObjectsImpl(conn, depth, catalog, db_schema, table_name,
+ table_type, column_name, &schema, &array, error);
+ if (status != ADBC_STATUS_OK) {
+ if (schema.release) schema.release(&schema);
+ if (array.release) array.release(&array);
+ return status;
+ }
+
+ return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
+ const char* catalog, const char* db_schema,
+ const char* table_name,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ if (catalog != NULL && strlen(catalog) > 0) {
+ // TODO: map 'catalog' to SQLite attached database
+ memset(schema, 0, sizeof(*schema));
+ return ADBC_STATUS_OK;
+ } else if (db_schema != NULL && strlen(db_schema) > 0) {
+ // SQLite does not support schemas
+ memset(schema, 0, sizeof(*schema));
+ return ADBC_STATUS_OK;
+ } else if (table_name == NULL) {
+ SetError(error, "AdbcConnectionGetTableSchema: must provide table_name");
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ struct StringBuilder query = {0};
+ StringBuilderInit(&query, /*initial_size=*/64);
+ StringBuilderAppend(&query, "SELECT * FROM ");
+ StringBuilderAppend(&query, table_name);
+
+ sqlite3_stmt* stmt = NULL;
+ int rc =
+ sqlite3_prepare_v2(conn->conn, query.buffer, query.size, &stmt, /*pzTail=*/NULL);
+ StringBuilderReset(&query);
+ if (rc != SQLITE_OK) {
+ SetError(error, "Failed to prepare query: %s", sqlite3_errmsg(conn->conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ struct ArrowArrayStream stream = {0};
+ AdbcStatusCode status = AdbcSqliteExportReader(conn->conn, stmt, /*binder=*/NULL,
+ /*batch_size=*/64, &stream, error);
+ if (status == ADBC_STATUS_OK) {
+ int code = stream.get_schema(&stream, schema);
+ if (code != 0) {
+ SetError(error, "Failed to get schema: (%d) %s", code, strerror(code));
+ status = ADBC_STATUS_IO;
+ }
+ }
+ if (stream.release) {
+ stream.release(&stream);
+ }
+ (void)sqlite3_finalize(stmt);
+ return status;
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypesImpl(struct ArrowSchema* schema,
+ struct ArrowArray* array,
+ struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/1), error);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[0], NANOARROW_TYPE_STRING), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "table_type"), error);
+ schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
+
+ CHECK_NA(INTERNAL, ArrowArrayInitFromSchema(array, schema, NULL), error);
+ CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
+
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("table")),
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+ CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("view")),
+ error);
+ CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
+
+ CHECK_NA(INTERNAL, ArrowArrayFinishBuilding(array, NULL), error);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+
+ struct ArrowSchema schema = {0};
+ struct ArrowArray array = {0};
+
+ AdbcStatusCode status = SqliteConnectionGetTableTypesImpl(&schema, &array, error);
+ if (status != ADBC_STATUS_OK) {
+ if (schema.release) schema.release(&schema);
+ if (array.release) array.release(&array);
+ return status;
+ }
+ return BatchToArrayStream(&array, &schema, out, error);
+}
+
+AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
+ const uint8_t* serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ if (!conn->active_transaction) {
+ SetError(error, "No active transaction, cannot commit");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
+ if (status != ADBC_STATUS_OK) return status;
+ return ExecuteQuery(conn, "BEGIN", error);
+}
+
+AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ if (!conn->active_transaction) {
+ SetError(error, "No active transaction, cannot rollback");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ AdbcStatusCode status = ExecuteQuery(conn, "ROLLBACK", error);
+ if (status != ADBC_STATUS_OK) return status;
+ return ExecuteQuery(conn, "BEGIN", error);
+}
+
+AdbcStatusCode SqliteStatementNew(struct AdbcConnection* connection,
+ struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ CHECK_CONN_INIT(connection, error);
+ struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
+ if (statement->private_data) {
+ SetError(error, "AdbcStatementNew: statement already allocated");
+ return ADBC_STATUS_INVALID_STATE;
+ } else if (!conn->conn) {
+ SetError(error, "AdbcStatementNew: connection is not initialized");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ statement->private_data = malloc(sizeof(struct SqliteStatement));
+ memset(statement->private_data, 0, sizeof(struct SqliteStatement));
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+ stmt->conn = conn->conn;
+
+ // Default options
+ stmt->batch_size = 1024;
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementRelease(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+ int rc = SQLITE_OK;
+ if (stmt->stmt) {
+ rc = sqlite3_finalize(stmt->stmt);
+ }
+ if (stmt->query) free(stmt->query);
+ AdbcSqliteBinderRelease(&stmt->binder);
+ if (stmt->target_table) free(stmt->target_table);
+ if (rc != SQLITE_OK) {
+ SetError(error, "AdbcStatementRelease: statement failed to finalize: (%d) %s", rc,
+ sqlite3_errmsg(stmt->conn));
+ }
+ free(statement->private_data);
+ statement->private_data = NULL;
+
+ return rc == SQLITE_OK ? ADBC_STATUS_OK : ADBC_STATUS_IO;
+}
+
+AdbcStatusCode SqliteStatementPrepare(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+ if (!stmt->query) {
+ SetError(error, "Must SetSqlQuery before ExecuteQuery or Prepare");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ if (stmt->prepared == 0) {
+ if (stmt->stmt) {
+ int rc = sqlite3_finalize(stmt->stmt);
+ stmt->stmt = NULL;
+ if (rc != SQLITE_OK) {
+ SetError(error, "Failed to finalize previous statement: (%d) %s", rc,
+ sqlite3_errmsg(stmt->conn));
+ return ADBC_STATUS_IO;
+ }
+ }
+
+ int rc =
+ sqlite3_prepare_v2(stmt->conn, stmt->query, (int)stmt->query_len, &stmt->stmt,
+ /*pzTail=*/NULL);
+ if (rc != SQLITE_OK) {
+ SetError(error, "Failed to prepare query: %s\nQuery:%s", sqlite3_errmsg(stmt->conn),
+ stmt->query);
+ (void)sqlite3_finalize(stmt->stmt);
+ stmt->stmt = NULL;
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ stmt->prepared = 1;
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementInitIngest(struct SqliteStatement* stmt,
+ sqlite3_stmt** insert_statement,
+ struct AdbcError* error) {
+ AdbcStatusCode code = ADBC_STATUS_OK;
+
+ // Create statements for CREATE TABLE / INSERT
+ struct StringBuilder create_query = {0};
+ struct StringBuilder insert_query = {0};
+ StringBuilderInit(&create_query, 256);
+ StringBuilderInit(&insert_query, 256);
+
+ StringBuilderAppend(&create_query, "CREATE TABLE ");
+ StringBuilderAppend(&create_query, stmt->target_table);
+ StringBuilderAppend(&create_query, " (");
+
+ StringBuilderAppend(&insert_query, "INSERT INTO ");
+ StringBuilderAppend(&insert_query, stmt->target_table);
+ StringBuilderAppend(&insert_query, " VALUES (");
+
+ for (int i = 0; i < stmt->binder.schema.n_children; i++) {
+ if (i > 0) StringBuilderAppend(&create_query, ", ");
+ // XXX: should escape the column name too
+ StringBuilderAppend(&create_query, stmt->binder.schema.children[i]->name);
+
+ if (i > 0) StringBuilderAppend(&insert_query, ", ");
+ StringBuilderAppend(&insert_query, "?");
+ }
+ StringBuilderAppend(&create_query, ")");
+ StringBuilderAppend(&insert_query, ")");
+
+ sqlite3_stmt* create = NULL;
+ if (!stmt->append) {
+ // Create table
+ int rc = sqlite3_prepare_v2(stmt->conn, create_query.buffer, (int)create_query.size,
+ &create, /*pzTail=*/NULL);
+ if (rc == SQLITE_OK) {
+ rc = sqlite3_step(create);
+ }
+
+ if (rc != SQLITE_OK && rc != SQLITE_DONE) {
+ SetError(error, "Failed to create table: %s (executed '%s')",
+ sqlite3_errmsg(stmt->conn), create_query.buffer);
+ code = ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ if (code == ADBC_STATUS_OK) {
+ int rc = sqlite3_prepare_v2(stmt->conn, insert_query.buffer, (int)insert_query.size,
+ insert_statement, /*pzTail=*/NULL);
+ if (rc != SQLITE_OK) {
+ SetError(error, "Failed to prepare statement: %s (executed '%s')",
+ sqlite3_errmsg(stmt->conn), insert_query.buffer);
+ code = ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ sqlite3_finalize(create);
+ StringBuilderReset(&create_query);
+ StringBuilderReset(&insert_query);
+ return code;
+}
+
+AdbcStatusCode SqliteStatementExecuteIngest(struct SqliteStatement* stmt,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ if (!stmt->binder.schema.release) {
+ SetError(error, "Must Bind() before bulk ingestion");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+
+ sqlite3_stmt* insert = NULL;
+ AdbcStatusCode status = SqliteStatementInitIngest(stmt, &insert, error);
+
+ int64_t row_count = 0;
+ if (status == ADBC_STATUS_OK) {
+ while (1) {
+ char finished = 0;
+ status =
+ AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, insert, &finished, error);
+ if (status != ADBC_STATUS_OK || finished) break;
+
+ int rc = 0;
+ do {
+ rc = sqlite3_step(insert);
+ } while (rc == SQLITE_ROW);
+ if (rc != SQLITE_DONE) {
+ SetError(error, "Failed to execute statement: %s", sqlite3_errmsg(stmt->conn));
+ status = ADBC_STATUS_INTERNAL;
+ break;
+ }
+ row_count++;
+ }
+ }
+
+ if (rows_affected) *rows_affected = row_count;
+ if (insert) sqlite3_finalize(insert);
+ AdbcSqliteBinderRelease(&stmt->binder);
+ return status;
+}
+
+AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
+ struct ArrowArrayStream* out,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+ if (stmt->target_table) {
+ return SqliteStatementExecuteIngest(stmt, rows_affected, error);
+ }
+
+ AdbcStatusCode status = SqliteStatementPrepare(statement, error);
+ if (status != ADBC_STATUS_OK) return status;
+
+ if (stmt->binder.schema.release) {
+ int64_t expected = sqlite3_bind_parameter_count(stmt->stmt);
+ int64_t actual = stmt->binder.schema.n_children;
+ if (actual != expected) {
+ SetError(error, "Parameter count mismatch: expected %" PRId64 " but found %" PRId64,
+ expected, actual);
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ }
+
+ if (!out) {
+ // Update
+ sqlite3_mutex_enter(sqlite3_db_mutex(stmt->conn));
+
+ AdbcStatusCode status = ADBC_STATUS_OK;
+ int64_t rows = 0;
+
+ while (1) {
+ if (stmt->binder.schema.release) {
+ char finished = 0;
+ status = AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, stmt->stmt,
+ &finished, error);
+ if (status != ADBC_STATUS_OK || finished) {
+ break;
+ }
+ }
+
+ while (sqlite3_step(stmt->stmt) == SQLITE_ROW) {
+ rows++;
+ }
+ if (!stmt->binder.schema.release) break;
+ }
+
+ if (sqlite3_reset(stmt->stmt) != SQLITE_OK) {
+ status = ADBC_STATUS_IO;
+ const char* msg = sqlite3_errmsg(stmt->conn);
+ SetError(error, "Failed to execute query: %s",
+ (msg == NULL) ? "(unknown error)" : msg);
+ }
+
+ sqlite3_mutex_leave(sqlite3_db_mutex(stmt->conn));
+
+ AdbcSqliteBinderRelease(&stmt->binder);
+ if (rows_affected) *rows_affected = rows;
+ return status;
+ }
+
+ // Query
+ if (rows_affected) *rows_affected = -1;
+ struct AdbcSqliteBinder* binder = stmt->binder.schema.release ? &stmt->binder : NULL;
+ return AdbcSqliteExportReader(stmt->conn, stmt->stmt, binder, stmt->batch_size, out,
+ error);
+}
+
+AdbcStatusCode SqliteStatementSetSqlQuery(struct AdbcStatement* statement,
+ const char* query, struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+ if (stmt->query) {
+ free(stmt->query);
+ stmt->query = NULL;
+ }
+ if (stmt->target_table) {
+ free(stmt->target_table);
+ stmt->target_table = NULL;
+ }
+ size_t len = strlen(query) + 1;
+ stmt->query = malloc(len);
+ stmt->query_len = len;
+ stmt->prepared = 0;
+ strncpy(stmt->query, query, len);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementSetSubstraitPlan(struct AdbcStatement* statement,
+ const uint8_t* plan, size_t length,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ SetError(error, "Substrait is not supported");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
+ struct ArrowArray* values, struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+ return AdbcSqliteBinderSetArray(&stmt->binder, values, schema, error);
+}
+
+AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+ return AdbcSqliteBinderSetArrayStream(&stmt->binder, stream, error);
+}
+
+AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ AdbcStatusCode status = SqliteStatementPrepare(statement, error);
+ if (status != ADBC_STATUS_OK) return status;
+
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+ int num_params = sqlite3_bind_parameter_count(stmt->stmt);
+ if (num_params < 0) {
+ // Should not happen
+ SetError(error, "SQLite returned negative parameter count");
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, num_params), error);
+ char buffer[11];
+ for (int i = 0; i < num_params; i++) {
+ const char* name = sqlite3_bind_parameter_name(stmt->stmt, i + 1);
+ if (name == NULL) {
+ snprintf(buffer, sizeof(buffer), "%d", i);
+ name = buffer;
+ }
+ CHECK_NA(INTERNAL, ArrowSchemaInit(schema->children[i], NANOARROW_TYPE_NA), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[i], name), error);
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const char* key,
+ const char* value, struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
+
+ if (strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
+ if (stmt->query) {
+ free(stmt->query);
+ stmt->query = NULL;
+ }
+ if (stmt->target_table) {
+ free(stmt->target_table);
+ stmt->target_table = NULL;
+ }
+
+ size_t len = strlen(value) + 1;
+ stmt->target_table = (char*)malloc(len);
+ strncpy(stmt->target_table, value, len);
+ return ADBC_STATUS_OK;
+ } else if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
+ if (strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
+ stmt->append = 1;
+ } else if (strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
+ stmt->append = 0;
+ } else {
+ SetError(error, "Invalid statement option value %s=%s", key, value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ return ADBC_STATUS_OK;
+ } else if (strcmp(key, kStatementOptionBatchRows) == 0) {
+ char* end = NULL;
+ long batch_size = strtol(value, &end, /*base=*/10); // NOLINT(runtime/int)
+ if (errno != 0) {
+ SetError(error, "Invalid statement option value %s=%s (out of range)", key, value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ } else if (batch_size <= 0) {
+ SetError(error,
+ "Invalid statement option value %s=%s (value is non-positive or invalid)",
+ key, value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ } else if (batch_size > (long)INT_MAX) { // NOLINT(runtime/int)
+ SetError(error,
+ "Invalid statement option value %s=%s (value is out of range of int)", key,
+ value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ stmt->batch_size = (int)batch_size;
+ return ADBC_STATUS_OK;
+ }
+ SetError(error, "Unknown statement option %s=%s", key, value);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcPartitions* partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ CHECK_STMT_INIT(statement, error);
+ SetError(error, "Partitioned result sets are not supported");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+} // NOLINT(whitespace/indent)
+
+AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError* error) {
+ if (version != ADBC_VERSION_1_0_0) {
+ SetError(error, "Only version %d supported, got %d", ADBC_VERSION_1_0_0, version);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
+ struct AdbcDriver* driver = (struct AdbcDriver*)raw_driver;
+ memset(driver, 0, sizeof(*driver));
+ driver->DatabaseInit = SqliteDatabaseInit;
+ driver->DatabaseNew = SqliteDatabaseNew;
+ driver->DatabaseRelease = SqliteDatabaseRelease;
+ driver->DatabaseSetOption = SqliteDatabaseSetOption;
+
+ driver->ConnectionCommit = SqliteConnectionCommit;
+ driver->ConnectionGetInfo = SqliteConnectionGetInfo;
+ driver->ConnectionGetObjects = SqliteConnectionGetObjects;
+ driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
+ driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
+ driver->ConnectionInit = SqliteConnectionInit;
+ driver->ConnectionNew = SqliteConnectionNew;
+ driver->ConnectionReadPartition = SqliteConnectionReadPartition;
+ driver->ConnectionRelease = SqliteConnectionRelease;
+ driver->ConnectionRollback = SqliteConnectionRollback;
+ driver->ConnectionSetOption = SqliteConnectionSetOption;
+
+ driver->StatementBind = SqliteStatementBind;
+ driver->StatementBindStream = SqliteStatementBindStream;
+ driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
+ driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
+ driver->StatementNew = SqliteStatementNew;
+ driver->StatementPrepare = SqliteStatementPrepare;
+ driver->StatementRelease = SqliteStatementRelease;
+ driver->StatementSetOption = SqliteStatementSetOption;
+ driver->StatementSetSqlQuery = SqliteStatementSetSqlQuery;
+ return ADBC_STATUS_OK;
+}
+
+// Public names
+
+AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
+ return SqliteDatabaseNew(database, error);
+}
+
+AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
+ const char* value, struct AdbcError* error) {
+ return SqliteDatabaseSetOption(database, key, value, error);
+}
+
+AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
+ return SqliteDatabaseInit(database, error);
+}
+
+AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
+ struct AdbcError* error) {
+ return SqliteDatabaseRelease(database, error);
+}
+
+AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionNew(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+ const char* value, struct AdbcError* error) {
+ return SqliteConnectionSetOption(connection, key, value, error);
+}
+
+AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
+ struct AdbcDatabase* database,
+ struct AdbcError* error) {
+ return SqliteConnectionInit(connection, database, error);
+}
+
+AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionRelease(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
+ uint32_t* info_codes, size_t info_codes_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return SqliteConnectionGetInfo(connection, info_codes, info_codes_length, out, error);
+}
+
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+ const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_type,
+ const char* column_name,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
+ table_type, column_name, out, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+ const char* catalog, const char* db_schema,
+ const char* table_name,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
+ schema, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return SqliteConnectionGetTableTypes(connection, out, error);
+}
+
+AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
+ const uint8_t* serialized_partition,
+ size_t serialized_length,
+ struct ArrowArrayStream* out,
+ struct AdbcError* error) {
+ return SqliteConnectionReadPartition(connection, serialized_partition,
+ serialized_length, out, error);
+}
+
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionCommit(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionRollback(connection, error);
+}
+
+AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
+ struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return SqliteStatementNew(connection, statement, error);
+}
+
+AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return SqliteStatementRelease(statement, error);
+}
+
+AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
+ struct ArrowArrayStream* out,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
+}
+
+AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return SqliteStatementPrepare(statement, error);
+}
+
+AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
+ const char* query, struct AdbcError* error) {
+ return SqliteStatementSetSqlQuery(statement, query, error);
+}
+
+AdbcStatusCode AdbcStatementSetSubstraitPlan(struct AdbcStatement* statement,
+ const uint8_t* plan, size_t length,
+ struct AdbcError* error) {
+ return SqliteStatementSetSubstraitPlan(statement, plan, length, error);
+}
+
+AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
+ struct ArrowArray* values, struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ return SqliteStatementBind(statement, values, schema, error);
+}
+
+AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error) {
+ return SqliteStatementBindStream(statement, stream, error);
+}
+
+AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ return SqliteStatementGetParameterSchema(statement, schema, error);
+}
+
+AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
+ const char* value, struct AdbcError* error) {
+ return SqliteStatementSetOption(statement, key, value, error);
+}
+
+AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
+ struct ArrowSchema* schema,
+ struct AdbcPartitions* partitions,
+ int64_t* rows_affected,
+ struct AdbcError* error) {
+ return SqliteStatementExecutePartitions(statement, schema, partitions, rows_affected,
+ error);
+} // NOLINT(whitespace/indent)
+// due to https://github.com/cpplint/cpplint/pull/189
+
+AdbcStatusCode AdbcDriverInit(int version, void* driver, struct AdbcError* error) {
+ return SqliteDriverInit(version, driver, error);
+}
diff --git a/c/driver/sqlite/sqlite.cc b/c/driver/sqlite/sqlite.cc
deleted file mode 100644
index fac667c..0000000
--- a/c/driver/sqlite/sqlite.cc
+++ /dev/null
@@ -1,1827 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <sqlite3.h>
-
-#include <algorithm>
-#include <cstring>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <unordered_map>
-#include <unordered_set>
-
-#include <arrow/builder.h>
-#include <arrow/c/bridge.h>
-#include <arrow/record_batch.h>
-#include <arrow/status.h>
-#include <arrow/table.h>
-#include <arrow/util/config.h>
-#include <arrow/util/logging.h>
-#include <arrow/util/string_builder.h>
-
-#include "adbc.h"
-#include "arrow/type_fwd.h"
-#include "driver/util.h"
-
-namespace {
-
-using arrow::Status;
-
-void ReleaseError(struct AdbcError* error) {
- delete[] error->message;
- error->message = nullptr;
-}
-
-template <typename... Args>
-void SetError(struct AdbcError* error, Args&&... args) {
- if (!error) return;
- std::string message =
- arrow::util::StringBuilder("[SQLite3] ", std::forward<Args>(args)...);
- if (error->message) {
- message.reserve(message.size() + 1 + std::strlen(error->message));
- message.append(1, '\n');
- message.append(error->message);
- delete[] error->message;
- }
- error->message = new char[message.size() + 1];
- message.copy(error->message, message.size());
- error->message[message.size()] = '\0';
- error->release = ReleaseError;
-}
-
-void SetError(sqlite3* db, const std::string& source, struct AdbcError* error) {
- return SetError(error, source, ": ", sqlite3_errmsg(db));
-}
-
-AdbcStatusCode CheckRc(sqlite3* db, int rc, const char* context,
- struct AdbcError* error) {
- if (rc != SQLITE_OK) {
- SetError(db, context, error);
- return ADBC_STATUS_IO;
- }
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* context,
- struct AdbcError* error) {
- if (rc != SQLITE_OK) {
- SetError(db, context, error);
- rc = sqlite3_finalize(stmt);
- if (rc != SQLITE_OK) {
- SetError(db, "sqlite3_finalize", error);
- }
- return ADBC_STATUS_IO;
- }
- return ADBC_STATUS_OK;
-}
-
-template <typename CallbackFn>
-AdbcStatusCode DoQuery(sqlite3* db, sqlite3_stmt* stmt, struct AdbcError* error,
- CallbackFn&& callback) {
- auto status = std::move(callback)();
- std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
- return status;
-}
-
-template <typename CallbackFn>
-AdbcStatusCode DoQuery(sqlite3* db, const char* query, struct AdbcError* error,
- CallbackFn&& callback) {
- sqlite3_stmt* stmt;
- int rc = sqlite3_prepare_v2(db, query, std::strlen(query), &stmt, /*pzTail=*/nullptr);
- if (rc != SQLITE_OK) return CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
- auto status = std::move(callback)(stmt);
- std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
- return status;
-}
-
-arrow::Status ToArrowStatus(AdbcStatusCode code, struct AdbcError* error) {
- if (code == ADBC_STATUS_OK) return Status::OK();
- // TODO:
- return Status::UnknownError(code);
-}
-
-AdbcStatusCode FromArrowStatus(const Status& status, struct AdbcError* error) {
- if (status.ok()) return ADBC_STATUS_OK;
- SetError(error, status);
- // TODO: map Arrow codes to ADBC codes
- return ADBC_STATUS_INTERNAL;
-}
-
-AdbcStatusCode ExportDataToStream(std::shared_ptr<arrow::Schema> schema,
- arrow::RecordBatchVector batches,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- std::shared_ptr<arrow::RecordBatchReader> reader;
- auto status = arrow::RecordBatchReader::Make(std::move(batches), std::move(schema))
- .Value(&reader);
- ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
- status = arrow::ExportRecordBatchReader(std::move(reader), stream);
- ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
- return ADBC_STATUS_OK;
-}
-
-std::shared_ptr<arrow::Schema> StatementToSchema(sqlite3_stmt* stmt) {
- // TODO: this is fundamentally the wrong way to go about
- // this. instead, we need to act like the CSV/JSON readers: sample
- // several rows and dynamically update the column type as we go.
- const int num_columns = sqlite3_column_count(stmt);
- arrow::FieldVector fields(num_columns);
- for (int i = 0; i < num_columns; i++) {
- const char* column_name = sqlite3_column_name(stmt, i);
- const int column_type = sqlite3_column_type(stmt, i);
- std::shared_ptr<arrow::DataType> arrow_type = nullptr;
- switch (column_type) {
- case SQLITE_INTEGER:
- arrow_type = arrow::int64();
- break;
- case SQLITE_FLOAT:
- arrow_type = arrow::float64();
- break;
- case SQLITE_BLOB:
- arrow_type = arrow::binary();
- break;
- case SQLITE_TEXT:
- arrow_type = arrow::utf8();
- break;
- case SQLITE_NULL:
- default:
- arrow_type = arrow::null();
- break;
- }
- fields[i] = arrow::field(column_name, std::move(arrow_type));
- }
- return arrow::schema(std::move(fields));
-}
-
-class SqliteDatabaseImpl {
- public:
- SqliteDatabaseImpl() : db_(nullptr), connection_count_(0) {}
-
- AdbcStatusCode Connect(sqlite3** db, struct AdbcError* error) {
- std::lock_guard<std::mutex> guard(mutex_);
- if (!db_) {
- SetError(error, "Database not yet initialized, call AdbcDatabaseInit");
- return ADBC_STATUS_INVALID_STATE;
- }
- // Create a new connection
- if (database_uri_ == ":memory:") {
- // unless the special ":memory:" filename is used
- *db = db_;
- } else {
- int rc =
- sqlite3_open_v2(database_uri_.c_str(), db,
- SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
- /*zVfs=*/nullptr);
- ADBC_RETURN_NOT_OK(CheckRc(*db, nullptr, rc, "sqlite3_open_v2", error));
- }
- ++connection_count_;
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Init(struct AdbcError* error) {
- if (db_) {
- SetError(error, "Database already initialized");
- return ADBC_STATUS_INVALID_STATE;
- }
- database_uri_ = "file:adbc_sqlite_driver?mode=memory&cache=shared";
- auto it = options_.find("filename");
- if (it != options_.end()) {
- database_uri_ = it->second;
- }
-
- int rc = sqlite3_open_v2(database_uri_.c_str(), &db_,
- SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
- /*zVfs=*/nullptr);
- ADBC_RETURN_NOT_OK(CheckRc(db_, nullptr, rc, "sqlite3_open_v2", error));
- options_.clear();
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode SetOption(const char* key, const char* value, struct AdbcError* error) {
- if (db_) {
- SetError(error, "Database already initialized");
- return ADBC_STATUS_INVALID_STATE;
- }
- options_[key] = value;
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Disconnect(sqlite3* db, struct AdbcError* error) {
- std::lock_guard<std::mutex> guard(mutex_);
- if (--connection_count_ < 0) {
- SetError(error, "Connection count underflow");
- return ADBC_STATUS_INVALID_STATE;
- }
- // Close the database unless :memory:
- if (database_uri_ != ":memory:") {
- if (sqlite3_close(db) != SQLITE_OK) {
- if (db) SetError(db, "sqlite3_close", error);
- return ADBC_STATUS_IO;
- }
- }
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Release(struct AdbcError* error) {
- std::lock_guard<std::mutex> guard(mutex_);
-
- if (connection_count_ > 0) {
- SetError(error, "Cannot release database with ", connection_count_,
- " open connections");
- return ADBC_STATUS_INVALID_STATE;
- }
-
- auto status = sqlite3_close(db_);
- if (status != SQLITE_OK) {
- if (db_) SetError(db_, "sqlite3_close", error);
- return ADBC_STATUS_IO;
- }
- return ADBC_STATUS_OK;
- }
-
- private:
- sqlite3* db_;
- int connection_count_;
- std::string database_uri_;
- std::unordered_map<std::string, std::string> options_;
- std::mutex mutex_;
-};
-
-class SqliteConnectionImpl {
- public:
- SqliteConnectionImpl() : database_(nullptr), db_(nullptr), autocommit_(true) {}
-
- sqlite3* db() const { return db_; }
-
- AdbcStatusCode GetInfo(uint32_t* info_codes, size_t info_codes_length,
- struct ArrowArrayStream* stream, struct AdbcError* error) {
- static std::shared_ptr<arrow::Schema> kInfoSchema = arrow::schema({
- arrow::field("info_name", arrow::uint32(), /*nullable=*/false),
- arrow::field(
- "info_value",
- arrow::dense_union({
- arrow::field("string_value", arrow::utf8()),
- arrow::field("bool_value", arrow::boolean()),
- arrow::field("int64_value", arrow::int64()),
- arrow::field("int32_bitmask", arrow::int32()),
- arrow::field("string_list", arrow::list(arrow::utf8())),
- arrow::field("int32_to_int32_list_map",
- arrow::map(arrow::int32(), arrow::list(arrow::int32()))),
- })),
- });
- static int kStringValueCode = 0;
-
- static std::vector<uint32_t> kSupported = {
- ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
- ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
- };
-
- if (!info_codes) {
- info_codes = kSupported.data();
- info_codes_length = kSupported.size();
- }
-
- arrow::UInt32Builder info_name;
- std::unique_ptr<arrow::ArrayBuilder> info_value_builder;
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(MakeBuilder(arrow::default_memory_pool(),
- kInfoSchema->field(1)->type(), &info_value_builder),
- error));
- arrow::DenseUnionBuilder* info_value =
- static_cast<arrow::DenseUnionBuilder*>(info_value_builder.get());
- arrow::StringBuilder* info_string =
- static_cast<arrow::StringBuilder*>(info_value->child_builder(0).get());
-
- for (size_t i = 0; i < info_codes_length; i++) {
- switch (info_codes[i]) {
- case ADBC_INFO_VENDOR_NAME:
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_value->Append(kStringValueCode), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_string->Append("SQLite3"), error));
- break;
- case ADBC_INFO_VENDOR_VERSION:
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_value->Append(kStringValueCode), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_string->Append(sqlite3_libversion()), error));
- break;
- case ADBC_INFO_DRIVER_NAME:
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_value->Append(kStringValueCode), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_string->Append("ADBC C SQLite3"), error));
- break;
- case ADBC_INFO_DRIVER_VERSION:
- // TODO: set up CMake to embed version info
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_value->Append(kStringValueCode), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_string->Append("0.0.1"), error));
- break;
- case ADBC_INFO_DRIVER_ARROW_VERSION:
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Append(info_codes[i]), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(info_value->Append(kStringValueCode), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- info_string->Append("Arrow/C++ " ARROW_VERSION_STRING), error));
- break;
- default:
- // Unrecognized
- break;
- }
- }
-
- arrow::ArrayVector arrays(2);
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_name.Finish(&arrays[0]), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(info_value->Finish(&arrays[1]), error));
- const int64_t rows = arrays[0]->length();
- return ExportDataToStream(
- kInfoSchema,
- {
- arrow::RecordBatch::Make(kInfoSchema, rows, std::move(arrays)),
- },
- stream, error);
- }
-
- AdbcStatusCode GetObjects(int depth, const char* catalog, const char* db_schema,
- const char* table_name, const char** table_type,
- const char* column_name, struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- static std::shared_ptr<arrow::DataType> kColumnSchema = arrow::struct_({
- arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
- arrow::field("ordinal_position", arrow::int32()),
- arrow::field("remarks", arrow::utf8()),
- arrow::field("xdbc_data_type", arrow::int16()),
- arrow::field("xdbc_type_name", arrow::utf8()),
- arrow::field("xdbc_column_size", arrow::int32()),
- arrow::field("xdbc_decimal_digits", arrow::int16()),
- arrow::field("xdbc_num_prec_radix", arrow::int16()),
- arrow::field("xdbc_nullable", arrow::int16()),
- arrow::field("xdbc_column_def", arrow::utf8()),
- arrow::field("xdbc_sql_data_type", arrow::int16()),
- arrow::field("xdbc_datetime_sub", arrow::int16()),
- arrow::field("xdbc_char_octet_length", arrow::int32()),
- arrow::field("xdbc_is_nullable", arrow::utf8()),
- arrow::field("xdbc_scope_catalog", arrow::utf8()),
- arrow::field("xdbc_scope_schema", arrow::utf8()),
- arrow::field("xdbc_scope_table", arrow::utf8()),
- arrow::field("xdbc_is_autoincrement", arrow::boolean()),
- arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
- });
- static std::shared_ptr<arrow::DataType> kUsageSchema = arrow::struct_({
- arrow::field("fk_catalog", arrow::utf8()),
- arrow::field("fk_db_schema", arrow::utf8()),
- arrow::field("fk_table", arrow::utf8()),
- arrow::field("fk_column_name", arrow::utf8()),
- });
- static std::shared_ptr<arrow::DataType> kConstraintSchema = arrow::struct_({
- arrow::field("constraint_name", arrow::utf8()),
- arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
- arrow::field("constraint_column_names", arrow::list(arrow::utf8()),
- /*nullable=*/false),
- arrow::field("constraint_column_usage", arrow::list(kUsageSchema)),
- });
- static std::shared_ptr<arrow::DataType> kTableSchema = arrow::struct_({
- arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
- arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
- arrow::field("table_columns", arrow::list(kColumnSchema)),
- arrow::field("table_constraints", arrow::list(kConstraintSchema)),
- });
- static std::shared_ptr<arrow::DataType> kDbSchemaSchema = arrow::struct_({
- arrow::field("db_schema_name", arrow::utf8()),
- arrow::field("db_schema_tables", arrow::list(kTableSchema)),
- });
- static std::shared_ptr<arrow::Schema> kCatalogSchema = arrow::schema({
- arrow::field("catalog_name", arrow::utf8()),
- arrow::field("catalog_db_schemas", arrow::list(kDbSchemaSchema)),
- });
-
- static const char kTableQuery[] =
- R"(SELECT name, type
- FROM sqlite_master
- WHERE name LIKE ? AND type <> "index"
- ORDER BY name ASC)";
- static const char kColumnQuery[] =
- R"(SELECT cid, name
- FROM pragma_table_info(?)
- WHERE name LIKE ?
- ORDER BY cid ASC)";
- static const char kPrimaryKeyQuery[] =
- R"(SELECT name
- FROM pragma_table_info(?)
- WHERE pk > 0
- ORDER BY pk ASC)";
- static const char kForeignKeyQuery[] =
- R"(SELECT id, seq, "table", "from", "to"
- FROM pragma_foreign_key_list(?)
- ORDER BY id, seq ASC)";
-
- arrow::StringBuilder catalog_name;
- std::unique_ptr<arrow::ArrayBuilder> catalog_schemas_builder;
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- MakeBuilder(arrow::default_memory_pool(), kCatalogSchema->field(1)->type(),
- &catalog_schemas_builder),
- error));
- auto* catalog_schemas =
- static_cast<arrow::ListBuilder*>(catalog_schemas_builder.get());
- auto* catalog_schemas_items =
- static_cast<arrow::StructBuilder*>(catalog_schemas->value_builder());
- auto* db_schema_name =
- static_cast<arrow::StringBuilder*>(catalog_schemas_items->child_builder(0).get());
- auto* db_schema_tables =
- static_cast<arrow::ListBuilder*>(catalog_schemas_items->child_builder(1).get());
- auto* db_schema_tables_items =
- static_cast<arrow::StructBuilder*>(db_schema_tables->value_builder());
- auto* table_names = static_cast<arrow::StringBuilder*>(
- db_schema_tables_items->child_builder(0).get());
- auto* table_types = static_cast<arrow::StringBuilder*>(
- db_schema_tables_items->child_builder(1).get());
- auto* table_columns =
- static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(2).get());
- auto* table_columns_items =
- static_cast<arrow::StructBuilder*>(table_columns->value_builder());
- auto* column_names =
- static_cast<arrow::StringBuilder*>(table_columns_items->child_builder(0).get());
- auto* ordinal_positions =
- static_cast<arrow::Int32Builder*>(table_columns_items->child_builder(1).get());
- auto* table_constraints =
- static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(3).get());
- auto* table_constraints_items =
- static_cast<arrow::StructBuilder*>(table_constraints->value_builder());
- auto* constraint_names = static_cast<arrow::StringBuilder*>(
- table_constraints_items->child_builder(0).get());
- auto* constraint_types = static_cast<arrow::StringBuilder*>(
- table_constraints_items->child_builder(1).get());
- auto* constraint_column_names =
- static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(2).get());
- auto* constraint_column_names_items =
- static_cast<arrow::StringBuilder*>(constraint_column_names->value_builder());
- auto* constraint_column_usage =
- static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(3).get());
- auto* constraint_column_usage_items =
- static_cast<arrow::StructBuilder*>(constraint_column_usage->value_builder());
- auto* constraint_column_usage_fk_catalog = static_cast<arrow::StringBuilder*>(
- constraint_column_usage_items->child_builder(0).get());
- auto* constraint_column_usage_fk_db_schema = static_cast<arrow::StringBuilder*>(
- constraint_column_usage_items->child_builder(1).get());
- auto* constraint_column_usage_fk_table = static_cast<arrow::StringBuilder*>(
- constraint_column_usage_items->child_builder(2).get());
- auto* constraint_column_usage_fk_column_name = static_cast<arrow::StringBuilder*>(
- constraint_column_usage_items->child_builder(3).get());
-
- // TODO: filter properly, also implement other attached databases
- if (!catalog || std::strlen(catalog) == 0) {
- // https://www.sqlite.org/cli.html
- // > The ".databases" command shows a list of all databases open
- // > in the current connection. There will always be at least
- // > 2. The first one is "main", the original database opened.
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Append("main"), error));
-
- if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->AppendNull(), error));
- } else if (!db_schema || std::strlen(db_schema) == 0) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_name->AppendNull(), error));
- if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->AppendNull(), error));
- } else {
- // Look up tables
-
- std::unordered_set<std::string> table_type_filter;
- if (table_type) {
- while (*table_type) {
- table_type_filter.insert(*table_type);
- table_type++;
- }
- }
-
- ADBC_RETURN_NOT_OK(
- DoQuery(db_, kTableQuery, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- if (table_name) {
- ADBC_RETURN_NOT_OK(CheckRc(
- db_,
- sqlite3_bind_text64(stmt, 1, table_name, std::strlen(table_name),
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
- } else {
- ADBC_RETURN_NOT_OK(CheckRc(
- db_,
- sqlite3_bind_text64(stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
- }
-
- int rc = SQLITE_OK;
- ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->Append(), error));
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- const char* cur_table =
- reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
- const char* cur_table_type =
- reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
-
- if (!table_type_filter.empty() &&
- table_type_filter.find(cur_table_type) == table_type_filter.end()) {
- continue;
- }
-
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_names->Append(cur_table), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_types->Append(cur_table_type), error));
- if (depth == ADBC_OBJECT_DEPTH_TABLES) {
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_columns->AppendNull(), error));
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_constraints->AppendNull(), error));
- } else {
- ADBC_RETURN_NOT_OK(FromArrowStatus(table_columns->Append(), error));
- ADBC_RETURN_NOT_OK(DoQuery(
- db_, kColumnQuery, error,
- [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- ADBC_RETURN_NOT_OK(
- CheckRc(db_,
- sqlite3_bind_text64(stmt, 1, cur_table,
- std::strlen(cur_table),
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
-
- if (column_name) {
- ADBC_RETURN_NOT_OK(
- CheckRc(db_,
- sqlite3_bind_text64(stmt, 2, column_name,
- std::strlen(column_name),
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
- } else {
- ADBC_RETURN_NOT_OK(
- CheckRc(db_,
- sqlite3_bind_text64(stmt, 2, "%", 1,
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
- }
-
- int rc = SQLITE_OK;
- int64_t row_count = 0;
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- row_count++;
- const int32_t cur_ordinal_position =
- 1 + sqlite3_column_int(stmt, 0);
- const char* cur_column_name = reinterpret_cast<const char*>(
- sqlite3_column_text(stmt, 1));
-
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- column_names->Append(cur_column_name), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- ordinal_positions->Append(cur_ordinal_position), error));
-
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_columns_items->Append(), error));
- }
- for (int i = 2; i < table_columns_items->num_children(); i++) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- table_columns_items->child_builder(i)->AppendNulls(
- row_count),
- error));
- }
- if (rc != SQLITE_DONE) {
- return CheckRc(db_, rc, "sqlite3_step", error);
- }
- return ADBC_STATUS_OK;
- }));
-
- // We can get primary key and foreign keys, but not unique (without
- // parsing the table definition, at least)
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(table_constraints->Append(), error));
- ADBC_RETURN_NOT_OK(DoQuery(
- db_, kPrimaryKeyQuery, error,
- [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- ADBC_RETURN_NOT_OK(
- CheckRc(db_,
- sqlite3_bind_text64(stmt, 1, cur_table,
- std::strlen(cur_table),
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
-
- int rc = SQLITE_OK;
- bool has_primary_key = false;
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- if (!has_primary_key) {
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(constraint_names->AppendNull(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_types->Append("PRIMARY KEY"), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_names->Append(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage->Append(), error));
- }
- has_primary_key = true;
- const char* cur_column_name = reinterpret_cast<const char*>(
- sqlite3_column_text(stmt, 0));
-
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_names_items->Append(cur_column_name),
- error));
- }
- if (has_primary_key) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- table_constraints_items->Append(), error));
- }
- if (rc != SQLITE_DONE) {
- return CheckRc(db_, rc, "sqlite3_step", error);
- }
- return ADBC_STATUS_OK;
- }));
- ADBC_RETURN_NOT_OK(DoQuery(
- db_, kForeignKeyQuery, error,
- [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- ADBC_RETURN_NOT_OK(
- CheckRc(db_,
- sqlite3_bind_text64(stmt, 1, cur_table,
- std::strlen(cur_table),
- SQLITE_STATIC, SQLITE_UTF8),
- "sqlite3_bind_text64", error));
-
- int rc = SQLITE_OK;
- int prev_key_id = -1;
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- const int key_id = sqlite3_column_int(stmt, 0);
- const int key_seq = sqlite3_column_int(stmt, 1);
- const char* to_table = reinterpret_cast<const char*>(
- sqlite3_column_text(stmt, 2));
- const char* from_col = reinterpret_cast<const char*>(
- sqlite3_column_text(stmt, 3));
- const char* to_col = reinterpret_cast<const char*>(
- sqlite3_column_text(stmt, 4));
- if (key_id != prev_key_id) {
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(constraint_names->AppendNull(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_types->Append("FOREIGN KEY"), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_names->Append(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage->Append(), error));
- if (prev_key_id != -1) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- table_constraints_items->Append(), error));
- }
- }
- prev_key_id = key_id;
-
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_names_items->Append(from_col), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage_fk_catalog->AppendNull(), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage_fk_db_schema->AppendNull(),
- error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage_fk_table->Append(to_table),
- error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage_fk_column_name->Append(to_col),
- error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- constraint_column_usage_items->Append(), error));
- }
- if (prev_key_id != -1) {
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- table_constraints_items->Append(), error));
- }
- if (rc != SQLITE_DONE) {
- return CheckRc(db_, rc, "sqlite3_step", error);
- }
- return ADBC_STATUS_OK;
- }));
- }
-
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(db_schema_tables_items->Append(), error));
- }
- if (rc != SQLITE_DONE) {
- return CheckRc(db_, rc, "sqlite3_step", error);
- }
- return ADBC_STATUS_OK;
- }));
- }
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas_items->Append(), error));
- } else {
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
- }
- }
-
- arrow::ArrayVector arrays(2);
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Finish(&arrays[0]), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Finish(&arrays[1]), error));
- const int64_t rows = arrays[0]->length();
- return ExportDataToStream(
- kCatalogSchema,
- {
- arrow::RecordBatch::Make(kCatalogSchema, rows, std::move(arrays)),
- },
- stream, error);
- }
-
- AdbcStatusCode GetTableTypes(struct ArrowArrayStream* stream, struct AdbcError* error) {
- auto schema =
- arrow::schema({arrow::field("table_type", arrow::utf8(), /*nullable=*/false)});
-
- arrow::StringBuilder builder;
- std::shared_ptr<arrow::Array> array;
- ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("table"), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("view"), error));
- ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Finish(&array), error));
-
- return ExportDataToStream(
- schema,
- {
- arrow::RecordBatch::Make(schema, /*num_rows=*/2, {std::move(array)}),
- },
- stream, error);
- }
-
- AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
- const char* table_name, struct ArrowSchema* schema,
- struct AdbcError* error) {
- if ((catalog && std::strlen(catalog) > 0) ||
- (db_schema && std::strlen(db_schema) > 0)) {
- std::memset(schema, 0, sizeof(*schema));
- SetError(error, "Catalog/schema are not supported");
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
-
- std::string query = "SELECT * FROM ";
- char* escaped = sqlite3_mprintf("%w", table_name);
- if (!escaped) {
- // Failed to allocate
- SetError(error, "Could not escape table name (failed to allocate memory)");
- return ADBC_STATUS_INTERNAL;
- }
- query += escaped;
- sqlite3_free(escaped);
-
- std::shared_ptr<arrow::Schema> arrow_schema;
- ADBC_RETURN_NOT_OK(
- DoQuery(db_, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- if (sqlite3_step(stmt) == SQLITE_ERROR) {
- SetError(db_, "sqlite3_step", error);
- return ADBC_STATUS_IO;
- }
- arrow_schema = StatementToSchema(stmt);
- return ADBC_STATUS_OK;
- }));
- return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
- }
-
- AdbcStatusCode Init(struct AdbcDatabase* database, struct AdbcError* error) {
- if (!database->private_data) {
- SetError(error, "database is not initialized");
- return ADBC_STATUS_INVALID_STATE;
- }
- database_ =
- *reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
- return database_->Connect(&db_, error);
- }
-
- AdbcStatusCode Release(struct AdbcError* error) {
- if (!database_) return ADBC_STATUS_OK;
- return database_->Disconnect(db_, error);
- }
-
- AdbcStatusCode SetAutocommit(bool autocommit, struct AdbcError* error) {
- if (autocommit == autocommit_) return ADBC_STATUS_OK;
- autocommit_ = autocommit;
-
- const char* query = autocommit_ ? "COMMIT" : "BEGIN TRANSACTION";
- return DoQuery(db_, query, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- return StepStatement(stmt, error);
- });
- }
-
- AdbcStatusCode Commit(struct AdbcError* error) {
- if (autocommit_) {
- SetError(error, "Cannot commit when in autocommit mode");
- return ADBC_STATUS_INVALID_STATE;
- }
- ADBC_RETURN_NOT_OK(
- DoQuery(db_, "COMMIT", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- return StepStatement(stmt, error);
- }));
- return DoQuery(db_, "BEGIN TRANSACTION", error,
- [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
- }
-
- AdbcStatusCode Rollback(struct AdbcError* error) {
- if (autocommit_) {
- SetError(error, "Cannot rollback when in autocommit mode");
- return ADBC_STATUS_INVALID_STATE;
- }
- ADBC_RETURN_NOT_OK(
- DoQuery(db_, "ROLLBACK", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- return StepStatement(stmt, error);
- }));
- return DoQuery(db_, "BEGIN TRANSACTION", error,
- [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
- }
-
- private:
- AdbcStatusCode StepStatement(sqlite3_stmt* stmt, struct AdbcError* error) {
- int rc = SQLITE_OK;
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- }
- if (rc != SQLITE_DONE) {
- return CheckRc(db_, rc, "sqlite3_step", error);
- }
- return ADBC_STATUS_OK;
- }
-
- std::shared_ptr<SqliteDatabaseImpl> database_;
- sqlite3* db_;
- bool autocommit_;
-};
-
-AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
- int64_t row, int* rc, struct AdbcError* error) {
- int col_index = 1;
- for (const auto& column : data.columns()) {
- if (column->IsNull(row)) {
- *rc = sqlite3_bind_null(stmt, col_index);
- } else {
- switch (column->type()->id()) {
- case arrow::Type::DOUBLE: {
- *rc = sqlite3_bind_double(
- stmt, col_index,
- static_cast<const arrow::DoubleArray&>(*column).Value(row));
- break;
- }
- case arrow::Type::INT64: {
- *rc = sqlite3_bind_int64(
- stmt, col_index, static_cast<const arrow::Int64Array&>(*column).Value(row));
- break;
- }
- case arrow::Type::STRING: {
- const auto& strings = static_cast<const arrow::StringArray&>(*column);
- *rc =
- sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
- strings.value_length(row), SQLITE_STATIC, SQLITE_UTF8);
- break;
- }
- default:
- SetError(error, "Binding parameter of type ", *column->type());
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- }
- if (*rc != SQLITE_OK) {
- SetError(error, "Failed to bind parameters");
- return ADBC_STATUS_IO;
- }
- col_index++;
- }
- return ADBC_STATUS_OK;
-}
-
-class SqliteStatementReader : public arrow::RecordBatchReader {
- public:
- explicit SqliteStatementReader(
- std::shared_ptr<SqliteConnectionImpl> connection, sqlite3_stmt* stmt,
- std::shared_ptr<arrow::RecordBatchReader> bind_parameters)
- : connection_(std::move(connection)),
- stmt_(stmt),
- bind_parameters_(std::move(bind_parameters)),
- schema_(nullptr),
- next_parameters_(nullptr),
- bind_index_(0),
- done_(false),
- rc_(SQLITE_OK) {}
-
- AdbcStatusCode Init(struct AdbcError* error) {
- // TODO: this crashes if the statement is closed while the reader
- // is still open.
- // Step the statement and get the schema (SQLite doesn't
- // necessarily know the schema until it begins to execute it)
-
- sqlite3* db = connection_->db();
- Status status;
- if (bind_parameters_) {
- status = bind_parameters_->ReadNext(&next_parameters_);
- ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
- ADBC_RETURN_NOT_OK(BindNext(&rc_, error));
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc_, "sqlite3_bind", error));
- }
- const int num_params = sqlite3_bind_parameter_count(stmt_);
- if (num_params > 0) {
- if (!bind_parameters_) {
- SetError(error, "Statement has parameters, but no parameters given");
- return ADBC_STATUS_INVALID_STATE;
- } else if (num_params != bind_parameters_->schema()->num_fields()) {
- SetError(error, "Statement has ", num_params, " parameters, but ",
- bind_parameters_->schema()->num_fields(), " parameters given");
- return ADBC_STATUS_INVALID_STATE;
- }
- } else if (bind_parameters_ && bind_parameters_->schema()->num_fields() > 0) {
- SetError(error, "Statement has no parameters, but ",
- bind_parameters_->schema()->num_fields(), " parameters given");
- return ADBC_STATUS_INVALID_STATE;
- }
-
- // XXX: with parameters, inferring the schema from the first
- // argument is inaccurate (what if one is null?). Is there a way
- // to hint to SQLite the real type?
-
- rc_ = sqlite3_step(stmt_);
- if (rc_ == SQLITE_ERROR) {
- return CheckRc(db, stmt_, rc_, "sqlite3_step", error);
- }
- schema_ = StatementToSchema(stmt_);
- return ADBC_STATUS_OK;
- }
-
- void OverrideSchema(std::shared_ptr<arrow::Schema> schema) {
- // TODO(ARROW-14705): use UnifySchemas for some sanity checking
- schema_ = std::move(schema);
- }
-
- std::shared_ptr<arrow::Schema> schema() const override {
- DCHECK(schema_);
- return schema_;
- }
-
- Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override {
- constexpr int64_t kBatchSize = 1024;
- if (done_) {
- *batch = nullptr;
- return Status::OK();
- }
-
- std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders(schema_->num_fields());
- for (int i = 0; static_cast<size_t>(i) < builders.size(); i++) {
- // TODO: allow overriding memory pool
- ARROW_RETURN_NOT_OK(arrow::MakeBuilder(arrow::default_memory_pool(),
- schema_->field(i)->type(), &builders[i]));
- }
-
- sqlite3* db = connection_->db();
-
- int64_t num_rows = 0;
- for (int64_t row = 0; row < kBatchSize; row++) {
- if (rc_ != SQLITE_DONE) {
- for (int col = 0; col < schema_->num_fields(); col++) {
- if (sqlite3_column_type(stmt_, col) == SQLITE_NULL) {
- ARROW_RETURN_NOT_OK(builders[col]->AppendNull());
- } else {
- const auto& field = schema_->field(col);
- switch (field->type()->id()) {
- case arrow::Type::DOUBLE: {
- const double value = sqlite3_column_double(stmt_, col);
- ARROW_RETURN_NOT_OK(
- dynamic_cast<arrow::DoubleBuilder*>(builders[col].get())
- ->Append(value));
- break;
- }
- case arrow::Type::INT64: {
- const sqlite3_int64 value = sqlite3_column_int64(stmt_, col);
- ARROW_RETURN_NOT_OK(
- dynamic_cast<arrow::Int64Builder*>(builders[col].get())
- ->Append(value));
- break;
- }
- case arrow::Type::NA: {
- ARROW_RETURN_NOT_OK(
- dynamic_cast<arrow::NullBuilder*>(builders[col].get())->AppendNull());
- break;
- }
- case arrow::Type::STRING: {
- const char* value =
- reinterpret_cast<const char*>(sqlite3_column_text(stmt_, col));
- const arrow::util::string_view view(value, std::strlen(value));
- ARROW_RETURN_NOT_OK(
- dynamic_cast<arrow::StringBuilder*>(builders[col].get())
- ->Append(value));
- break;
- }
- default:
- return Status::NotImplemented("[SQLite3] Cannot read field '",
- field->name(), "' of type ",
- field->type()->ToString());
- }
- }
- }
- num_rows++;
- }
-
- if (rc_ == SQLITE_ROW) {
- rc_ = sqlite3_step(stmt_);
- }
- if (rc_ == SQLITE_ROW) {
- continue;
- } else if (rc_ == SQLITE_DONE) {
- if (bind_parameters_ &&
- (!next_parameters_ || bind_index_ >= next_parameters_->num_rows())) {
- ARROW_RETURN_NOT_OK(bind_parameters_->ReadNext(&next_parameters_));
- bind_index_ = 0;
- }
-
- if (next_parameters_ && bind_index_ < next_parameters_->num_rows()) {
- rc_ = sqlite3_reset(stmt_);
- if (rc_ != SQLITE_OK) {
- return Status::IOError("[SQLite3] sqlite3_reset: ", sqlite3_errmsg(db));
- }
- struct AdbcError error;
- ARROW_RETURN_NOT_OK(ToArrowStatus(BindNext(&rc_, &error), &error));
- rc_ = sqlite3_step(stmt_);
- if (rc_ != SQLITE_ERROR) continue;
- }
- done_ = true;
- next_parameters_.reset();
- break;
- }
- return Status::IOError("[SQLite3] sqlite3_step: ", sqlite3_errmsg(db));
- }
-
- if (done_ && num_rows == 0) {
- *batch = nullptr;
- } else {
- arrow::ArrayVector arrays(builders.size());
- for (size_t i = 0; i < builders.size(); i++) {
- ARROW_RETURN_NOT_OK(builders[i]->Finish(&arrays[i]));
- }
- *batch = arrow::RecordBatch::Make(schema_, num_rows, std::move(arrays));
- }
- return Status::OK();
- }
-
- private:
- AdbcStatusCode BindNext(int* rc, struct AdbcError* error) {
- if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
- return ADBC_STATUS_OK;
- }
- return BindParameters(stmt_, *next_parameters_, bind_index_++, rc, error);
- }
-
- std::shared_ptr<SqliteConnectionImpl> connection_;
- sqlite3_stmt* stmt_;
- std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
-
- std::shared_ptr<arrow::Schema> schema_;
- std::shared_ptr<arrow::RecordBatch> next_parameters_;
- int64_t bind_index_;
- bool done_;
- int rc_;
-};
-
-class SqliteStatementImpl {
- public:
- explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
- : connection_(std::move(connection)), append_(false), stmt_(nullptr) {}
-
- AdbcStatusCode Close(struct AdbcError* error) {
- if (stmt_) {
- const int rc = sqlite3_finalize(stmt_);
- stmt_ = nullptr;
- ADBC_RETURN_NOT_OK(
- CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
- }
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
- struct ArrowArray* values, struct ArrowSchema* schema,
- struct AdbcError* error) {
- std::shared_ptr<arrow::RecordBatch> batch;
- auto status = arrow::ImportRecordBatch(values, schema).Value(&batch);
- if (!status.ok()) {
- SetError(error, status);
- return ADBC_STATUS_INTERNAL;
- }
-
- std::shared_ptr<arrow::Table> table;
- status = arrow::Table::FromRecordBatches({std::move(batch)}).Value(&table);
- if (!status.ok()) {
- SetError(error, status);
- return ADBC_STATUS_INTERNAL;
- }
-
- bind_parameters_.reset(new arrow::TableBatchReader(std::move(table)));
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
- struct ArrowArrayStream* stream, struct AdbcError* error) {
- auto status = arrow::ImportRecordBatchReader(stream).Value(&bind_parameters_);
- if (!status.ok()) {
- SetError(error, status);
- return ADBC_STATUS_INTERNAL;
- }
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode ExecuteQuery(const std::shared_ptr<SqliteStatementImpl>& self,
- struct ArrowArrayStream* out, int64_t* rows_affected,
- struct AdbcError* error) {
- if (!stmt_ && bulk_table_.empty()) {
- SetError(error, "Cannot execute a statement without a query");
- return ADBC_STATUS_INVALID_STATE;
- } else if (!bulk_table_.empty()) {
- if (out) {
- SetError(error, "Bulk ingestion does not generate a result set");
- return ADBC_STATUS_INVALID_ARGUMENT;
- }
- return ExecuteBulk(rows_affected, error);
- }
-
- sqlite3* db = connection_->db();
- int rc = SQLITE_OK;
- if (stmt_) {
- sqlite3_clear_bindings(stmt_);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
- }
-
- rc = sqlite3_reset(stmt_);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
- auto reader = std::make_shared<SqliteStatementReader>(connection_, stmt_,
- std::move(bind_parameters_));
- ADBC_RETURN_NOT_OK(reader->Init(error));
-
- if (out) {
- Status status = arrow::ExportRecordBatchReader(std::move(reader), out);
- ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
- } else {
- // XXX: the logic for the reader is heavily intertwined into everything
- // For now, just initialize and step it for the side effect
-
- while (true) {
- std::shared_ptr<arrow::RecordBatch> batch;
- ADBC_RETURN_NOT_OK(FromArrowStatus(reader->ReadNext(&batch), error));
- if (!batch) break;
- }
- ADBC_RETURN_NOT_OK(FromArrowStatus(reader->Close(), error));
- }
-
- if (rows_affected) *rows_affected = -1;
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode Prepare(const std::shared_ptr<SqliteStatementImpl>& self,
- struct AdbcError* error) {
- if (stmt_) {
- // No-op
- return ADBC_STATUS_OK;
- } else if (!bulk_table_.empty()) {
- SetError(error, "Cannot prepare with bulk insert");
- return ADBC_STATUS_INVALID_STATE;
- }
- SetError(error, "Cannot prepare a statement without a query");
- return ADBC_STATUS_INVALID_STATE;
- }
-
- AdbcStatusCode GetParameterSchema(const std::shared_ptr<SqliteStatementImpl>& self,
- struct ArrowSchema* schema, struct AdbcError* error) {
- if (!stmt_) {
- SetError(error, "Cannot get parameter schema before preparing");
- return ADBC_STATUS_INVALID_STATE;
- }
-
- const int num_params = sqlite3_bind_parameter_count(stmt_);
- arrow::FieldVector fields(num_params);
- for (int i = 0; i < num_params; i++) {
- const char* name = sqlite3_bind_parameter_name(stmt_, i);
- fields[i] = arrow::field(name ? name : "", arrow::null());
- }
- ADBC_RETURN_NOT_OK(FromArrowStatus(
- arrow::ExportSchema(arrow::Schema(std::move(fields)), schema), error));
- return ADBC_STATUS_OK;
- }
-
- AdbcStatusCode SetOption(const std::shared_ptr<SqliteStatementImpl>& self,
- const char* key, const char* value, struct AdbcError* error) {
- if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
- // Bulk ingest
-
- // Clear previous statement, if any
- ADBC_RETURN_NOT_OK(Close(error));
-
- if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
- bulk_table_ = value;
- return ADBC_STATUS_OK;
- } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
- if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
- append_ = true;
- } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
- append_ = false;
- } else {
- SetError(error, "Unknown value '", value, "' for option: ", key);
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
- return ADBC_STATUS_OK;
- }
- SetError(error, "Unknown option: ", key);
- return ADBC_STATUS_NOT_IMPLEMENTED;
- }
-
- AdbcStatusCode SetSqlQuery(const std::shared_ptr<SqliteStatementImpl>& self,
- const char* query, struct AdbcError* error) {
- bulk_table_.clear();
- // Clear previous statement, if any
- ADBC_RETURN_NOT_OK(Close(error));
-
- sqlite3* db = connection_->db();
- int rc = sqlite3_prepare_v2(db, query, static_cast<int>(std::strlen(query)), &stmt_,
- /*pzTail=*/nullptr);
- return CheckRc(connection_->db(), stmt_, rc, "sqlite3_prepare_v2", error);
- }
-
- private:
- AdbcStatusCode ExecuteBulk(int64_t* rows_affected, struct AdbcError* error) {
- if (!bind_parameters_) {
- SetError(error, "Must AdbcStatementBind for bulk insertion");
- return ADBC_STATUS_INVALID_STATE;
- }
-
- sqlite3* db = connection_->db();
-
- // Create the table
- if (!append_) {
- // XXX: not injection-safe
- std::string query = "CREATE TABLE ";
- query += bulk_table_;
- query += " (";
- const auto& fields = bind_parameters_->schema()->fields();
- for (int i = 0; i < fields.size(); i++) {
- if (i > 0) query += ',';
- query += fields[i]->name();
- }
- query += ')';
-
- if (DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
- const int rc = sqlite3_step(stmt);
- if (rc == SQLITE_DONE) return ADBC_STATUS_OK;
- return CheckRc(db, stmt, rc, "sqlite3_step", error);
- }) != ADBC_STATUS_OK) {
- return ADBC_STATUS_ALREADY_EXISTS;
- }
- }
-
- // Insert the rows
- {
- std::string query = "INSERT INTO ";
- query += bulk_table_;
- query += " VALUES (";
- const auto& fields = bind_parameters_->schema()->fields();
- for (int i = 0; i < fields.size(); i++) {
- if (i > 0) query += ',';
- query += '?';
- }
- query += ')';
-
- sqlite3_stmt* stmt;
- int rc = sqlite3_prepare_v2(db, query.c_str(), static_cast<int>(query.size()),
- &stmt, /*pzTail=*/nullptr);
- if (rc != SQLITE_OK) {
- // XXX: not a great way to try to figure out the right error
- AdbcStatusCode code = ADBC_STATUS_ALREADY_EXISTS;
- if (std::strstr(sqlite3_errmsg(db), "no such table:")) {
- code = ADBC_STATUS_NOT_FOUND;
- }
- // Clean up
- std::ignore = CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
- return code;
- }
-
- ADBC_RETURN_NOT_OK(DoQuery(db, stmt, error, [&]() -> AdbcStatusCode {
- int rc = SQLITE_OK;
- int64_t rows = 0;
- while (true) {
- std::shared_ptr<arrow::RecordBatch> batch;
- ADBC_RETURN_NOT_OK(
- FromArrowStatus(bind_parameters_->Next().Value(&batch), error));
- if (!batch) break;
-
- for (int64_t row = 0; row < batch->num_rows(); row++) {
- ADBC_RETURN_NOT_OK(BindParameters(stmt, *batch, row, &rc, error));
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_bind", error));
-
- rc = sqlite3_step(stmt);
- if (rc != SQLITE_DONE) {
- return CheckRc(db, stmt, rc, "sqlite3_step", error);
- }
-
- rc = sqlite3_reset(stmt);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_reset", error));
-
- rc = sqlite3_clear_bindings(stmt);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_clear_bindings", error));
- }
- rows += batch->num_rows();
- }
- if (rows_affected) *rows_affected = rows;
- return ADBC_STATUS_OK;
- }));
- }
-
- bind_parameters_.reset();
- return ADBC_STATUS_OK;
- }
-
- std::shared_ptr<SqliteConnectionImpl> connection_;
-
- // Query state
-
- // Bulk ingestion
- // Target of bulk ingestion (rather janky to store state like this, though…)
- std::string bulk_table_;
- bool append_;
-
- // Prepared statements
- sqlite3_stmt* stmt_;
- std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
-};
-
-// ADBC interface implementation - as private functions so that these
-// don't get replaced by the dynamic linker. If we implemented these
-// under the Adbc* names, then DriverInit, the linker may resolve
-// functions to the address of the functions provided by the driver
-// manager instead of our functions.
-//
-// We could also:
-// - Play games with RTLD_DEEPBIND - but this doesn't work with ASan
-// - Use __attribute__((visibility("protected"))) - but this is
-// apparently poorly supported by some linkers
-// - Play with -Bsymbolic(-functions) - but this has other
-// consequences and complicates the build setup
-//
-// So in the end some manual effort here was chosen.
-
-AdbcStatusCode SqliteDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
- auto impl = std::make_shared<SqliteDatabaseImpl>();
- database->private_data = new std::shared_ptr<SqliteDatabaseImpl>(impl);
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
- return (*ptr)->Init(error);
-}
-
-AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char* key,
- const char* value, struct AdbcError* error) {
- if (!database || !database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
- return (*ptr)->SetOption(key, value, error);
-}
-
-AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!database->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
- AdbcStatusCode status = (*ptr)->Release(error);
- delete ptr;
- database->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->Commit(error);
-}
-
-AdbcStatusCode SqliteConnectionGetInfo(struct AdbcConnection* connection,
- uint32_t* info_codes, size_t info_codes_length,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->GetInfo(info_codes, info_codes_length, stream, error);
-}
-
-AdbcStatusCode SqliteConnectionGetObjects(
- struct AdbcConnection* connection, int depth, const char* catalog,
- const char* db_schema, const char* table_name, const char** table_types,
- const char* column_name, struct ArrowArrayStream* stream, struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->GetObjects(depth, catalog, db_schema, table_name, table_types,
- column_name, stream, error);
-}
-
-AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
- const char* catalog, const char* db_schema,
- const char* table_name,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
-}
-
-AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->GetTableTypes(stream, error);
-}
-
-AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
- struct AdbcDatabase* database,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->Init(database, error);
-}
-
-AdbcStatusCode SqliteConnectionNew(struct AdbcConnection* connection,
- struct AdbcError* error) {
- auto impl = std::make_shared<SqliteConnectionImpl>();
- connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
- const uint8_t* serialized_partition,
- size_t serialized_length,
- struct ArrowArrayStream* out,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- AdbcStatusCode status = (*ptr)->Release(error);
- delete ptr;
- connection->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- return (*ptr)->Rollback(error);
-}
-
-AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
- const char* key, const char* value,
- struct AdbcError* error) {
- if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
- auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
-
- if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
- bool autocommit = false;
- if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
- autocommit = false;
- } else if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
- autocommit = true;
- } else {
- SetError(error, "Invalid option value for autocommit: ", value);
- return ADBC_STATUS_INVALID_ARGUMENT;
- }
- return (*ptr)->SetAutocommit(autocommit, error);
- } else {
- SetError(error, "Unknown option");
- }
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
- struct ArrowArray* values, struct ArrowSchema* schema,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->Bind(*ptr, values, schema, error);
-}
-
-AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->Bind(*ptr, stream, error);
-}
-
-AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
- struct ArrowSchema* schema,
- struct AdbcPartitions* partitions,
- int64_t* rows_affected,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
- struct ArrowArrayStream* out,
- int64_t* rows_affected,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->ExecuteQuery(*ptr, out, rows_affected, error);
-}
-
-AdbcStatusCode SqliteStatementGetPartitionDesc(struct AdbcStatement* statement,
- size_t index, uint8_t* partition_desc,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementGetPartitionDescSize(struct AdbcStatement* statement,
- size_t index, size_t* length,
- struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
-}
-
-AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->GetParameterSchema(*ptr, schema, error);
-}
-
-AdbcStatusCode SqliteStatementNew(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error) {
- auto conn_ptr =
- reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
- auto impl = std::make_shared<SqliteStatementImpl>(*conn_ptr);
- statement->private_data = new std::shared_ptr<SqliteStatementImpl>(impl);
- return ADBC_STATUS_OK;
-}
-
-AdbcStatusCode SqliteStatementPrepare(struct AdbcStatement* statement,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->Prepare(*ptr, error);
-}
-
-AdbcStatusCode SqliteStatementRelease(struct AdbcStatement* statement,
- struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- auto status = (*ptr)->Close(error);
- delete ptr;
- statement->private_data = nullptr;
- return status;
-}
-
-AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const char* key,
- const char* value, struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->SetOption(*ptr, key, value, error);
-}
-
-AdbcStatusCode SqliteStatementSetSqlQuery(struct AdbcStatement* statement,
- const char* query, struct AdbcError* error) {
- if (!statement->private_data) return ADBC_STATUS_INVALID_STATE;
- auto* ptr =
- reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
- return (*ptr)->SetSqlQuery(*ptr, query, error);
-}
-
-} // namespace
-
-AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
- return SqliteDatabaseInit(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
- return SqliteDatabaseNew(database, error);
-}
-
-AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
- const char* value, struct AdbcError* error) {
- return SqliteDatabaseSetOption(database, key, value, error);
-}
-
-AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
- struct AdbcError* error) {
- return SqliteDatabaseRelease(database, error);
-}
-
-AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return SqliteConnectionCommit(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
- uint32_t* info_codes, size_t info_codes_length,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return SqliteConnectionGetInfo(connection, info_codes, info_codes_length, stream,
- error);
-}
-
-AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
- const char* catalog, const char* db_schema,
- const char* table_name, const char** table_types,
- const char* column_name,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
- table_types, column_name, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
- const char* catalog, const char* db_schema,
- const char* table_name,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
- schema, error);
-}
-
-AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return SqliteConnectionGetTableTypes(connection, stream, error);
-}
-
-AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
- struct AdbcDatabase* database,
- struct AdbcError* error) {
- return SqliteConnectionInit(connection, database, error);
-}
-
-AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return SqliteConnectionNew(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
- const uint8_t* serialized_partition,
- size_t serialized_length,
- struct ArrowArrayStream* out,
- struct AdbcError* error) {
- return SqliteConnectionReadPartition(connection, serialized_partition,
- serialized_length, out, error);
-}
-
-AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return SqliteConnectionRelease(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
- struct AdbcError* error) {
- return SqliteConnectionRollback(connection, error);
-}
-
-AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
- const char* value, struct AdbcError* error) {
- return SqliteConnectionSetOption(connection, key, value, error);
-}
-
-AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
- struct ArrowArray* values, struct ArrowSchema* schema,
- struct AdbcError* error) {
- return SqliteStatementBind(statement, values, schema, error);
-}
-
-AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
- struct ArrowArrayStream* stream,
- struct AdbcError* error) {
- return SqliteStatementBindStream(statement, stream, error);
-}
-
-AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
- ArrowSchema* schema,
- struct AdbcPartitions* partitions,
- int64_t* rows_affected,
- struct AdbcError* error) {
- return SqliteStatementExecutePartitions(statement, schema, partitions, rows_affected,
- error);
-}
-
-AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
- struct ArrowArrayStream* out,
- int64_t* rows_affected,
- struct AdbcError* error) {
- return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
- size_t index, uint8_t* partition_desc,
- struct AdbcError* error) {
- return SqliteStatementGetPartitionDesc(statement, index, partition_desc, error);
-}
-
-AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
- size_t index, size_t* length,
- struct AdbcError* error) {
- return SqliteStatementGetPartitionDescSize(statement, index, length, error);
-}
-
-AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
- struct ArrowSchema* schema,
- struct AdbcError* error) {
- return SqliteStatementGetParameterSchema(statement, schema, error);
-}
-
-AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error) {
- return SqliteStatementNew(connection, statement, error);
-}
-
-AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
- struct AdbcError* error) {
- return SqliteStatementPrepare(statement, error);
-}
-
-AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
- struct AdbcError* error) {
- return SqliteStatementRelease(statement, error);
-}
-
-AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
- const char* value, struct AdbcError* error) {
- return SqliteStatementSetOption(statement, key, value, error);
-}
-
-AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
- const char* query, struct AdbcError* error) {
- return SqliteStatementSetSqlQuery(statement, query, error);
-}
-
-extern "C" {
-ADBC_EXPORT
-AdbcStatusCode AdbcDriverInit(int version, void* raw_driver, struct AdbcError* error) {
- if (version != ADBC_VERSION_1_0_0) return ADBC_STATUS_NOT_IMPLEMENTED;
-
- auto* driver = reinterpret_cast<struct AdbcDriver*>(raw_driver);
- std::memset(driver, 0, sizeof(*driver));
- driver->DatabaseInit = SqliteDatabaseInit;
- driver->DatabaseNew = SqliteDatabaseNew;
- driver->DatabaseRelease = SqliteDatabaseRelease;
- driver->DatabaseSetOption = SqliteDatabaseSetOption;
-
- driver->ConnectionCommit = SqliteConnectionCommit;
- driver->ConnectionGetInfo = SqliteConnectionGetInfo;
- driver->ConnectionGetObjects = SqliteConnectionGetObjects;
- driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
- driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
- driver->ConnectionInit = SqliteConnectionInit;
- driver->ConnectionNew = SqliteConnectionNew;
- driver->ConnectionReadPartition = SqliteConnectionReadPartition;
- driver->ConnectionRelease = SqliteConnectionRelease;
- driver->ConnectionRollback = SqliteConnectionRollback;
- driver->ConnectionSetOption = SqliteConnectionSetOption;
-
- driver->StatementBind = SqliteStatementBind;
- driver->StatementBindStream = SqliteStatementBindStream;
- driver->StatementExecutePartitions = SqliteStatementExecutePartitions;
- driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
- driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
- driver->StatementNew = SqliteStatementNew;
- driver->StatementPrepare = SqliteStatementPrepare;
- driver->StatementRelease = SqliteStatementRelease;
- driver->StatementSetOption = SqliteStatementSetOption;
- driver->StatementSetSqlQuery = SqliteStatementSetSqlQuery;
- return ADBC_STATUS_OK;
-}
-}
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index ce97e6b..c211170 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -15,225 +15,24 @@
// specific language governing permissions and limitations
// under the License.
+#include <cstring>
+#include <limits>
+#include <optional>
#include <string>
-#include <vector>
+#include <string_view>
-#include <gmock/gmock.h>
+#include <adbc.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-matchers.h>
+#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>
+#include <nanoarrow.h>
-#include <arrow/c/bridge.h>
-#include <arrow/record_batch.h>
-#include <arrow/table.h>
-#include <arrow/testing/matchers.h>
-
-#include "adbc.h"
-#include "driver/test_util.h"
+#include "statement_reader.h"
#include "validation/adbc_validation.h"
+#include "validation/adbc_validation_util.h"
-// Tests of the SQLite example driver
-
-namespace adbc {
-
-using arrow::PointeesEqual;
-
-using RecordBatchMatcher =
- decltype(::testing::UnorderedPointwise(PointeesEqual(), arrow::RecordBatchVector{}));
-
-RecordBatchMatcher BatchesAre(const std::shared_ptr<arrow::Schema>& schema,
- const std::vector<std::string>& batch_json) {
- arrow::RecordBatchVector batches;
- for (const std::string& json : batch_json) {
- batches.push_back(adbc::RecordBatchFromJSON(schema, json));
- }
- return ::testing::UnorderedPointwise(PointeesEqual(), std::move(batches));
-}
-
-class Sqlite : public ::testing::Test {
- public:
- void SetUp() override {
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcDatabaseSetOption(&database, "filename", ":memory:", &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
- ASSERT_NE(database.private_data, nullptr);
-
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &database, &error));
- ASSERT_NE(connection.private_data, nullptr);
- }
-
- void TearDown() override {
- if (error.message) {
- error.release(&error);
- }
-
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
- ASSERT_EQ(connection.private_data, nullptr);
-
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
- ASSERT_EQ(database.private_data, nullptr);
- }
-
- protected:
- void IngestSampleTable(struct AdbcConnection* connection) {
- ArrowArray export_table;
- ArrowSchema export_schema;
- auto bulk_table =
- adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
- ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
- ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
- AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(connection, &statement, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
- "bulk_insert", &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
- int64_t rows_affected = 0;
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error));
- ASSERT_EQ(bulk_table->num_rows(), rows_affected);
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
- }
-
- AdbcDatabase database;
- AdbcConnection connection;
- AdbcError error = {};
-
- std::shared_ptr<arrow::Schema> bulk_schema = arrow::schema(
- {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
-
- std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
- arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
- arrow::field("ordinal_position", arrow::int32()),
- arrow::field("remarks", arrow::utf8()),
- arrow::field("xdbc_data_type", arrow::int16()),
- arrow::field("xdbc_type_name", arrow::utf8()),
- arrow::field("xdbc_column_size", arrow::int32()),
- arrow::field("xdbc_decimal_digits", arrow::int16()),
- arrow::field("xdbc_num_prec_radix", arrow::int16()),
- arrow::field("xdbc_nullable", arrow::int16()),
- arrow::field("xdbc_column_def", arrow::utf8()),
- arrow::field("xdbc_sql_data_type", arrow::int16()),
- arrow::field("xdbc_datetime_sub", arrow::int16()),
- arrow::field("xdbc_char_octet_length", arrow::int32()),
- arrow::field("xdbc_is_nullable", arrow::utf8()),
- arrow::field("xdbc_scope_catalog", arrow::utf8()),
- arrow::field("xdbc_scope_schema", arrow::utf8()),
- arrow::field("xdbc_scope_table", arrow::utf8()),
- arrow::field("xdbc_is_autoincrement", arrow::boolean()),
- arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
- });
- std::shared_ptr<arrow::DataType> usage_schema = arrow::struct_({
- arrow::field("fk_catalog", arrow::utf8()),
- arrow::field("fk_db_schema", arrow::utf8()),
- arrow::field("fk_table", arrow::utf8()),
- arrow::field("fk_column_name", arrow::utf8()),
- });
- std::shared_ptr<arrow::DataType> constraint_schema = arrow::struct_({
- arrow::field("constraint_name", arrow::utf8()),
- arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
- arrow::field("constraint_column_names", arrow::list(arrow::utf8()),
- /*nullable=*/false),
- arrow::field("constraint_column_usage", arrow::list(usage_schema)),
- });
- std::shared_ptr<arrow::DataType> table_schema = arrow::struct_({
- arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
- arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
- arrow::field("table_columns", arrow::list(column_schema)),
- arrow::field("table_constraints", arrow::list(constraint_schema)),
- });
- std::shared_ptr<arrow::DataType> db_schema_schema = arrow::struct_({
- arrow::field("db_schema_name", arrow::utf8()),
- arrow::field("db_schema_tables", arrow::list(table_schema)),
- });
- std::shared_ptr<arrow::Schema> catalog_schema = arrow::schema({
- arrow::field("catalog_name", arrow::utf8()),
- arrow::field("catalog_db_schemas", arrow::list(db_schema_schema)),
- });
-};
-
-TEST_F(Sqlite, MetadataGetObjectsColumns) {
- {
- AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
-
- ADBC_ASSERT_OK_WITH_ERROR(
- error,
- AdbcStatementSetSqlQuery(
- &statement, "CREATE TABLE parent (a, b, c, PRIMARY KEY(c, b))", &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementSetSqlQuery(&statement, "CREATE TABLE other (a)", &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementSetSqlQuery(
- &statement,
- "CREATE TABLE child (a, b, c, PRIMARY KEY(a), FOREIGN KEY (c, b) "
- "REFERENCES parent (c, b), FOREIGN KEY (a) REFERENCES other(a))",
- &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
-
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
- }
-
- struct ArrowArrayStream stream;
- std::shared_ptr<arrow::Schema> schema;
- arrow::RecordBatchVector batches;
-
- ADBC_ASSERT_OK_WITH_ERROR(
- error,
- AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
- nullptr, nullptr, nullptr, &stream, &error));
- ASSERT_NO_FATAL_FAILURE(ReadStream(&stream, &schema, &batches));
- EXPECT_THAT(batches,
- BatchesAre(catalog_schema,
- {R"([["main", [{"db_schema_name": null, "db_schema_tables": [
- {
- "table_name": "child",
- "table_type": "table",
- "table_columns": [
- ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
- ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
- ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
- ],
- "table_constraints": [
- [null, "PRIMARY KEY", ["a"], []],
- [null, "FOREIGN KEY", ["a"], [[null, null, "other", "a"]]],
- [null, "FOREIGN KEY", ["c", "b"], [[null, null, "parent", "c"], [null, null, "parent", "b"]]]
- ]
- },
- {
- "table_name": "other",
- "table_type": "table",
- "table_columns": [
- ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
- ],
- "table_constraints": []
- },
- {
- "table_name": "parent",
- "table_type": "table",
- "table_columns": [
- ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
- ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
- ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
- ],
- "table_constraints": [
- [null, "PRIMARY KEY", ["c", "b"], []]
- ]
- }
-]}]]])"}));
- batches.clear();
-}
+// -- ADBC Test Suite ------------------------------------------------
class SqliteQuirks : public adbc_validation::DriverQuirks {
public:
@@ -241,7 +40,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
struct AdbcError* error) const override {
// Shared DB required for transaction tests
return AdbcDatabaseSetOption(
- database, "filename", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
+ database, "uri", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
}
std::string BindParameter(int index) const override { return "?"; }
@@ -284,4 +83,430 @@ class SqliteStatementTest : public ::testing::Test,
};
ADBCV_TEST_STATEMENT(SqliteStatementTest)
-} // namespace adbc
+// -- SQLite Specific Tests ------------------------------------------
+
+constexpr size_t kInferRows = 16;
+
+using adbc_validation::CompareArray;
+using adbc_validation::Handle;
+using adbc_validation::IsOkErrno;
+using adbc_validation::IsOkStatus;
+
+/// Specific tests of the type-inferring reader
+class SqliteReaderTest : public ::testing::Test {
+ public:
+ void SetUp() override {
+ std::memset(&error, 0, sizeof(error));
+ std::memset(&binder, 0, sizeof(binder));
+ ASSERT_EQ(SQLITE_OK, sqlite3_open_v2(
+ ":memory:", &db,
+ SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+ /*zVfs=*/nullptr));
+ }
+ void TearDown() override {
+ if (error.release) error.release(&error);
+ AdbcSqliteBinderRelease(&binder);
+ sqlite3_finalize(stmt);
+ sqlite3_close(db);
+ }
+
+ void Exec(const std::string& query) {
+ ASSERT_EQ(SQLITE_OK, sqlite3_prepare_v2(db, query.c_str(), query.size(), &stmt,
+ /*pzTail=*/nullptr));
+ ASSERT_EQ(SQLITE_DONE, sqlite3_step(stmt));
+ sqlite3_finalize(stmt);
+ stmt = nullptr;
+ }
+
+ void Bind(struct ArrowArray* batch, struct ArrowSchema* schema) {
+ ASSERT_THAT(AdbcSqliteBinderSetArray(&binder, batch, schema, &error),
+ IsOkStatus(&error));
+ }
+
+ void Bind(struct ArrowArrayStream* stream) {
+ ASSERT_THAT(AdbcSqliteBinderSetArrayStream(&binder, stream, &error),
+ IsOkStatus(&error));
+ }
+
+ void ExecSelect(const std::string& values, size_t infer_rows,
+ adbc_validation::StreamReader* reader) {
+ ASSERT_NO_FATAL_FAILURE(Exec("CREATE TABLE foo (col)"));
+ ASSERT_NO_FATAL_FAILURE(Exec("INSERT INTO foo VALUES " + values));
+ const std::string query = "SELECT * FROM foo";
+ ASSERT_NO_FATAL_FAILURE(Exec(query, infer_rows, reader));
+ ASSERT_EQ(1, reader->schema->n_children);
+ }
+
+ void Exec(const std::string& query, size_t infer_rows,
+ adbc_validation::StreamReader* reader) {
+ ASSERT_EQ(SQLITE_OK, sqlite3_prepare_v2(db, query.c_str(), query.size(), &stmt,
+ /*pzTail=*/nullptr));
+ struct AdbcSqliteBinder* binder =
+ this->binder.schema.release ? &this->binder : nullptr;
+ ASSERT_THAT(AdbcSqliteExportReader(db, stmt, binder, infer_rows,
+ &reader->stream.value, &error),
+ IsOkStatus(&error));
+ ASSERT_NO_FATAL_FAILURE(reader->GetSchema());
+ }
+
+ protected:
+ sqlite3* db = nullptr;
+ sqlite3_stmt* stmt = nullptr;
+ struct AdbcError error;
+ struct AdbcSqliteBinder binder;
+};
+
+TEST_F(SqliteReaderTest, IntsNulls) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(ExecSelect("(NULL), (1), (NULL), (-1)", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0],
+ {std::nullopt, 1, std::nullopt, -1}));
+}
+
+TEST_F(SqliteReaderTest, FloatsNulls) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect("(NULL), (1.0), (NULL), (-1.0), (0.0)", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+ reader.array_view->children[0], {std::nullopt, 1.0, std::nullopt, -1.0, 0.0}));
+}
+
+TEST_F(SqliteReaderTest, IntsFloatsNulls) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect("(NULL), (1), (NULL), (-1.0), (0)", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+ reader.array_view->children[0], {std::nullopt, 1.0, std::nullopt, -1.0, 0.0}));
+}
+
+TEST_F(SqliteReaderTest, IntsNullsStrsNullsInts) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(ExecSelect(
+ R"((NULL), (1), (NULL), (-1), ("foo"), (NULL), (""), (24))", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(
+ reader.array_view->children[0],
+ {std::nullopt, "1", std::nullopt, "-1", "foo", std::nullopt, "", "24"}));
+}
+
+TEST_F(SqliteReaderTest, IntExtremes) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((NULL), (9223372036854775807), (NULL), (-9223372036854775808))",
+ kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0],
+ {std::nullopt, std::numeric_limits<int64_t>::max(),
+ std::nullopt, std::numeric_limits<int64_t>::min()}));
+}
+
+TEST_F(SqliteReaderTest, IntExtremesStrs) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(ExecSelect(
+ R"((NULL), (9223372036854775807), (-9223372036854775808), (""), (9223372036854775807), (-9223372036854775808))",
+ kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(reader.array_view->children[0],
+ {
+ std::nullopt,
+ "9223372036854775807",
+ "-9223372036854775808",
+ "",
+ "9223372036854775807",
+ "-9223372036854775808",
+ }));
+}
+
+TEST_F(SqliteReaderTest, FloatExtremes) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((NULL), (9e999), (NULL), (-9e999))", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<double>(
+ reader.array_view->children[0], {
+ std::nullopt,
+ std::numeric_limits<double>::infinity(),
+ std::nullopt,
+ -std::numeric_limits<double>::infinity(),
+ }));
+}
+
+TEST_F(SqliteReaderTest, IntsFloatsStrs) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((1), (1.0), (""), (9e999), (-9e999))", kInferRows, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<std::string>(reader.array_view->children[0],
+ {"1.000000e+00", "1.000000e+00", "", "inf", "-inf"}));
+}
+
+TEST_F(SqliteReaderTest, InferIntReadInt) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((1), (NULL), (2), (NULL))", /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {2, std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferIntRejectFloat) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((1), (NULL), (2E0), (NULL))", /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+
+ ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+ ASSERT_THAT(reader.stream->get_last_error(&reader.stream.value),
+ ::testing::HasSubstr(
+ "[SQLite] Type mismatch in column 0: expected INT64 but got DOUBLE"));
+}
+
+TEST_F(SqliteReaderTest, InferIntRejectStr) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((1), (NULL), (""), (NULL))", /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {1, std::nullopt}));
+
+ ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+ ASSERT_THAT(
+ reader.stream->get_last_error(&reader.stream.value),
+ ::testing::HasSubstr(
+ "[SQLite] Type mismatch in column 0: expected INT64 but got STRING/BINARY"));
+}
+
+TEST_F(SqliteReaderTest, InferFloatReadIntFloat) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(
+ ExecSelect(R"((1E0), (NULL), (2E0), (3), (NULL))", /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<double>(reader.array_view->children[0], {1.0, std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<double>(reader.array_view->children[0], {2.0, 3.0}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<double>(reader.array_view->children[0], {std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferFloatRejectStr) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(ExecSelect(R"((1E0), (NULL), (2E0), (3), (""), (NULL))",
+ /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_DOUBLE, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<double>(reader.array_view->children[0], {1.0, std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<double>(reader.array_view->children[0], {2.0, 3.0}));
+
+ ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+ ASSERT_THAT(
+ reader.stream->get_last_error(&reader.stream.value),
+ ::testing::HasSubstr(
+ "[SQLite] Type mismatch in column 0: expected DOUBLE but got STRING/BINARY"));
+}
+
+TEST_F(SqliteReaderTest, InferStrReadAll) {
+ adbc_validation::StreamReader reader;
+ ASSERT_NO_FATAL_FAILURE(ExecSelect(R"((""), (NULL), (2), (3E0), ("foo"), (NULL))",
+ /*infer_rows=*/2, &reader));
+ ASSERT_EQ(NANOARROW_TYPE_STRING, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<std::string>(reader.array_view->children[0], {"", std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<std::string>(reader.array_view->children[0], {"2", "3.0"}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<std::string>(reader.array_view->children[0], {"foo", std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferOneParam) {
+ adbc_validation::StreamReader reader;
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> batch;
+
+ ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
+ ASSERT_THAT(
+ adbc_validation::MakeBatch<int64_t>(&schema.value, &batch.value, /*error=*/nullptr,
+ {std::nullopt, 2, 4, -1}),
+ IsOkErrno());
+
+ ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+ ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/2, &reader));
+
+ ASSERT_EQ(1, reader.schema->n_children);
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 2}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0], {4, -1}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferOneParamStream) {
+ adbc_validation::StreamReader reader;
+ Handle<struct ArrowArrayStream> stream;
+ Handle<struct ArrowSchema> schema;
+ std::vector<struct ArrowArray> batches(3);
+
+ ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
+ ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[0],
+ /*error=*/nullptr, {std::nullopt, 1}),
+ IsOkErrno());
+ ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[1],
+ /*error=*/nullptr, {2, 3}),
+ IsOkErrno());
+ ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batches[2],
+ /*error=*/nullptr, {4, std::nullopt}),
+ IsOkErrno());
+ adbc_validation::MakeStream(&stream.value, &schema.value, std::move(batches));
+
+ ASSERT_NO_FATAL_FAILURE(Bind(&stream.value));
+ ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/3, &reader));
+
+ ASSERT_EQ(1, reader.schema->n_children);
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 1, 2}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {3, 4, std::nullopt}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+}
+
+TEST_F(SqliteReaderTest, InferTypedParams) {
+ adbc_validation::StreamReader reader;
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> batch;
+
+ ASSERT_NO_FATAL_FAILURE(Exec("CREATE TABLE foo (idx, value)"));
+ ASSERT_NO_FATAL_FAILURE(
+ Exec(R"(INSERT INTO foo VALUES (0, "foo"), (1, NULL), (2, 4), (3, 1E2))"));
+
+ ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
+ ASSERT_THAT(adbc_validation::MakeBatch<int64_t>(&schema.value, &batch.value,
+ /*error=*/nullptr, {1, 2, 3, 0}),
+ IsOkErrno());
+
+ ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+ ASSERT_NO_FATAL_FAILURE(
+ Exec("SELECT value FROM foo WHERE idx = ?", /*infer_rows=*/2, &reader));
+ ASSERT_EQ(1, reader.schema->n_children);
+ ASSERT_EQ(NANOARROW_TYPE_INT64, reader.fields[0].data_type);
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, 4}));
+ ASSERT_THAT(reader.MaybeNext(), ::testing::Not(IsOkErrno()));
+ ASSERT_THAT(reader.stream->get_last_error(&reader.stream.value),
+ ::testing::HasSubstr(
+ "[SQLite] Type mismatch in column 0: expected INT64 but got DOUBLE"));
+}
+
+template <typename CType>
+class SqliteNumericParamTest : public SqliteReaderTest,
+ public ::testing::WithParamInterface<ArrowType> {
+ public:
+ void Test(ArrowType expected_type) {
+ adbc_validation::StreamReader reader;
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> batch;
+
+ ASSERT_THAT(adbc_validation::MakeSchema(&schema.value, {{"", GetParam()}}),
+ IsOkErrno());
+ ASSERT_THAT(adbc_validation::MakeBatch<CType>(&schema.value, &batch.value,
+ /*error=*/nullptr,
+ {std::nullopt, 0, 1, 2, 4, 8}),
+ IsOkErrno());
+
+ ASSERT_NO_FATAL_FAILURE(Bind(&batch.value, &schema.value));
+ ASSERT_NO_FATAL_FAILURE(Exec("SELECT ?", /*infer_rows=*/2, &reader));
+
+ ASSERT_EQ(1, reader.schema->n_children);
+ ASSERT_EQ(expected_type, reader.fields[0].data_type);
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<CType>(reader.array_view->children[0], {std::nullopt, 0}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<CType>(reader.array_view->children[0], {1, 2}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NO_FATAL_FAILURE(CompareArray<CType>(reader.array_view->children[0], {4, 8}));
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+ }
+};
+
+class SqliteIntParamTest : public SqliteNumericParamTest<int64_t> {};
+
+TEST_P(SqliteIntParamTest, BindInt) {
+ ASSERT_NO_FATAL_FAILURE(Test(NANOARROW_TYPE_INT64));
+}
+
+INSTANTIATE_TEST_SUITE_P(IntTypes, SqliteIntParamTest,
+ ::testing::Values(NANOARROW_TYPE_UINT8, NANOARROW_TYPE_UINT16,
+ NANOARROW_TYPE_UINT32, NANOARROW_TYPE_UINT64,
+ NANOARROW_TYPE_INT8, NANOARROW_TYPE_INT16,
+ NANOARROW_TYPE_INT32, NANOARROW_TYPE_INT64));
+
+class SqliteFloatParamTest : public SqliteNumericParamTest<double> {};
+
+TEST_P(SqliteFloatParamTest, BindFloat) {
+ ASSERT_NO_FATAL_FAILURE(Test(NANOARROW_TYPE_DOUBLE));
+}
+
+INSTANTIATE_TEST_SUITE_P(FloatTypes, SqliteFloatParamTest,
+ ::testing::Values(
+ // XXX: AppendDouble currently doesn't really work with
+ // floats (FLT_MIN/FLT_MAX isn't the right thing)
+
+ // NANOARROW_TYPE_FLOAT,
+ NANOARROW_TYPE_DOUBLE));
diff --git a/c/driver/sqlite/statement_reader.c b/c/driver/sqlite/statement_reader.c
new file mode 100644
index 0000000..9f8813d
--- /dev/null
+++ b/c/driver/sqlite/statement_reader.c
@@ -0,0 +1,896 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "statement_reader.h"
+
+#include <inttypes.h>
+#include <math.h>
+#include <stdio.h>
+
+#include <adbc.h>
+#include <nanoarrow.h>
+#include <sqlite3.h>
+
+#include "utils.h"
+
+AdbcStatusCode AdbcSqliteBinderSet(struct AdbcSqliteBinder* binder,
+ struct AdbcError* error) {
+ int status = binder->params.get_schema(&binder->params, &binder->schema);
+ if (status != 0) {
+ const char* message = binder->params.get_last_error(&binder->params);
+ if (!message) message = "(unknown error)";
+ SetError(error, "Failed to get parameter schema: (%d) %s: %s", status,
+ strerror(status), message);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ struct ArrowError arrow_error = {0};
+ status = ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
+ if (status != 0) {
+ SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+ strerror(status), arrow_error.message);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ if (binder->batch.storage_type != NANOARROW_TYPE_STRUCT) {
+ SetError(error, "Bind parameters do not have root type STRUCT");
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ binder->types =
+ (enum ArrowType*)malloc(binder->schema.n_children * sizeof(enum ArrowType));
+
+ struct ArrowSchemaView view = {0};
+ for (int i = 0; i < binder->schema.n_children; i++) {
+ status = ArrowSchemaViewInit(&view, binder->schema.children[i], &arrow_error);
+ if (status != 0) {
+ SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
+ strerror(status), status, arrow_error.message);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ if (view.data_type == NANOARROW_TYPE_UNINITIALIZED) {
+ SetError(error, "Column %d has UNINITIALIZED type", i);
+ return ADBC_STATUS_INTERNAL;
+ }
+ binder->types[i] = view.data_type;
+ }
+
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode AdbcSqliteBinderSetArray(struct AdbcSqliteBinder* binder,
+ struct ArrowArray* values,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ AdbcSqliteBinderRelease(binder);
+ RAISE_ADBC(BatchToArrayStream(values, schema, &binder->params, error));
+ return AdbcSqliteBinderSet(binder, error);
+} // NOLINT(whitespace/indent)
+AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct AdbcSqliteBinder* binder,
+ struct ArrowArrayStream* values,
+ struct AdbcError* error) {
+ AdbcSqliteBinderRelease(binder);
+ binder->params = *values;
+ memset(values, 0, sizeof(*values));
+ return AdbcSqliteBinderSet(binder, error);
+}
+AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder, sqlite3* conn,
+ sqlite3_stmt* stmt, char* finished,
+ struct AdbcError* error) {
+ struct ArrowError arrow_error = {0};
+ int status = 0;
+ while (!binder->array.release || binder->next_row >= binder->array.length) {
+ if (binder->array.release) {
+ ArrowArrayViewReset(&binder->batch);
+ binder->array.release(&binder->array);
+
+ status =
+ ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
+ if (status != 0) {
+ SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+ strerror(status), arrow_error.message);
+ return ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ status = binder->params.get_next(&binder->params, &binder->array);
+ if (status != 0) {
+ const char* message = binder->params.get_last_error(&binder->params);
+ if (!message) message = "(unknown error)";
+ SetError(error, "Failed to get next parameter batch: (%d) %s: %s", status,
+ strerror(status), message);
+ return ADBC_STATUS_IO;
+ }
+
+ if (!binder->array.release) {
+ *finished = 1;
+ AdbcSqliteBinderRelease(binder);
+ return ADBC_STATUS_OK;
+ }
+
+ status = ArrowArrayViewSetArray(&binder->batch, &binder->array, &arrow_error);
+ if (status != 0) {
+ SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
+ strerror(status), arrow_error.message);
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ binder->next_row = 0;
+ }
+
+ if (sqlite3_reset(stmt) != SQLITE_OK) {
+ SetError(error, "Failed to reset statement: %s", sqlite3_errmsg(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ if (sqlite3_clear_bindings(stmt) != SQLITE_OK) {
+ SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ for (int col = 0; col < binder->schema.n_children; col++) {
+ if (ArrowArrayViewIsNull(binder->batch.children[col], binder->next_row)) {
+ status = sqlite3_bind_null(stmt, col + 1);
+ } else {
+ switch (binder->types[col]) {
+ case NANOARROW_TYPE_BINARY:
+ case NANOARROW_TYPE_LARGE_BINARY: {
+ struct ArrowBufferView value =
+ ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
+ status = sqlite3_bind_text(stmt, col + 1, value.data.as_char, value.n_bytes,
+ SQLITE_STATIC);
+ break;
+ }
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_UINT64: {
+ uint64_t value =
+ ArrowArrayViewGetUIntUnsafe(binder->batch.children[col], binder->next_row);
+ if (value > INT64_MAX) {
+ SetError(error,
+ "Column %d has unsigned integer value %" PRIu64
+ "out of range of int64_t",
+ col, value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ status = sqlite3_bind_int64(stmt, col + 1, (int64_t)value);
+ break;
+ }
+ case NANOARROW_TYPE_INT8:
+ case NANOARROW_TYPE_INT16:
+ case NANOARROW_TYPE_INT32:
+ case NANOARROW_TYPE_INT64: {
+ int64_t value =
+ ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
+ status = sqlite3_bind_int64(stmt, col + 1, value);
+ break;
+ }
+ case NANOARROW_TYPE_FLOAT:
+ case NANOARROW_TYPE_DOUBLE: {
+ int64_t value = ArrowArrayViewGetDoubleUnsafe(binder->batch.children[col],
+ binder->next_row);
+ status = sqlite3_bind_double(stmt, col + 1, value);
+ break;
+ }
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_LARGE_STRING: {
+ struct ArrowBufferView value =
+ ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
+ status = sqlite3_bind_text(stmt, col + 1, value.data.as_char, value.n_bytes,
+ SQLITE_STATIC);
+ break;
+ }
+ default:
+ SetError(error, "Column %d has unsupported type %s", col,
+ ArrowTypeString(binder->types[col]));
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ }
+
+ if (status != SQLITE_OK) {
+ SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
+ return ADBC_STATUS_INTERNAL;
+ }
+ }
+
+ binder->next_row++;
+ *finished = 0;
+ return ADBC_STATUS_OK;
+}
+
+void AdbcSqliteBinderRelease(struct AdbcSqliteBinder* binder) {
+ if (binder->schema.release) {
+ binder->schema.release(&binder->schema);
+ }
+ if (binder->params.release) {
+ binder->params.release(&binder->params);
+ }
+ if (binder->types) {
+ free(binder->types);
+ }
+ if (binder->array.release) {
+ binder->array.release(&binder->array);
+ }
+ ArrowArrayViewReset(&binder->batch);
+ memset(binder, 0, sizeof(*binder));
+}
+
+struct StatementReader {
+ sqlite3* db;
+ sqlite3_stmt* stmt;
+ enum ArrowType* types;
+ struct ArrowSchema schema;
+ struct ArrowArray initial_batch;
+ struct AdbcSqliteBinder* binder;
+ struct ArrowError error;
+ char done;
+ int batch_size;
+};
+
+const char* StatementReaderGetLastError(struct ArrowArrayStream* self) {
+ if (!self->release || !self->private_data) {
+ return NULL;
+ }
+
+ struct StatementReader* reader = (struct StatementReader*)self->private_data;
+ return reader->error.message;
+}
+
+void StatementReaderSetError(struct StatementReader* reader) {
+ const char* msg = sqlite3_errmsg(reader->db);
+ strncpy(reader->error.message, msg, sizeof(reader->error.message));
+ reader->error.message[sizeof(reader->error.message) - 1] = '\0';
+}
+
+int StatementReaderGetOneValue(struct StatementReader* reader, int col,
+ struct ArrowArray* out) {
+ int sqlite_type = sqlite3_column_type(reader->stmt, col);
+
+ if (sqlite_type == SQLITE_NULL) {
+ return ArrowArrayAppendNull(out, 1);
+ }
+
+ switch (reader->types[col]) {
+ case NANOARROW_TYPE_INT64: {
+ switch (sqlite_type) {
+ case SQLITE_INTEGER: {
+ int64_t value = sqlite3_column_int64(reader->stmt, col);
+ return ArrowArrayAppendInt(out, value);
+ }
+ case SQLITE_FLOAT: {
+ // TODO: behavior needs to be configurable
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected INT64 but got DOUBLE",
+ col);
+ return EIO;
+ }
+ case SQLITE_TEXT:
+ case SQLITE_BLOB: {
+ snprintf(
+ reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected INT64 but got STRING/BINARY",
+ col);
+ return EIO;
+ }
+ default: {
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected INT64 but got unknown "
+ "type %d",
+ col, sqlite_type);
+ return ENOTSUP;
+ }
+ }
+ break;
+ }
+
+ case NANOARROW_TYPE_DOUBLE: {
+ switch (sqlite_type) {
+ case SQLITE_INTEGER:
+ case SQLITE_FLOAT: {
+ // Let SQLite convert
+ double value = sqlite3_column_double(reader->stmt, col);
+ return ArrowArrayAppendDouble(out, value);
+ }
+ case SQLITE_TEXT:
+ case SQLITE_BLOB: {
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected DOUBLE but got "
+ "STRING/BINARY",
+ col);
+ return EIO;
+ }
+ default: {
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected DOUBLE but got unknown "
+ "type %d",
+ col, sqlite_type);
+ return ENOTSUP;
+ }
+ }
+ break;
+ }
+
+ case NANOARROW_TYPE_STRING: {
+ switch (sqlite_type) {
+ case SQLITE_INTEGER:
+ case SQLITE_FLOAT:
+ case SQLITE_TEXT:
+ case SQLITE_BLOB: {
+ // Let SQLite convert
+ struct ArrowStringView value = {
+ .data = (const char*)sqlite3_column_text(reader->stmt, col),
+ .n_bytes = sqlite3_column_bytes(reader->stmt, col),
+ };
+ return ArrowArrayAppendString(out, value);
+ }
+ default: {
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Type mismatch in column %d: expected STRING but got unknown "
+ "type %d",
+ col, sqlite_type);
+ return ENOTSUP;
+ }
+ }
+ break;
+ }
+
+ default: {
+ snprintf(reader->error.message, sizeof(reader->error.message),
+ "[SQLite] Internal error: unknown inferred column type %d",
+ reader->types[col]);
+ return ENOTSUP;
+ }
+ }
+
+ return 0;
+}
+
+int StatementReaderGetNext(struct ArrowArrayStream* self, struct ArrowArray* out) {
+ if (!self->release || !self->private_data) {
+ return EINVAL;
+ }
+
+ struct StatementReader* reader = (struct StatementReader*)self->private_data;
+ if (reader->initial_batch.release != NULL) {
+ memcpy(out, &reader->initial_batch, sizeof(*out));
+ memset(&reader->initial_batch, 0, sizeof(reader->initial_batch));
+ return 0;
+ } else if (reader->done) {
+ out->release = NULL;
+ return 0;
+ }
+
+ RAISE_NA(ArrowArrayInitFromSchema(out, &reader->schema, &reader->error));
+ RAISE_NA(ArrowArrayStartAppending(out));
+ int64_t batch_size = 0;
+ int status = 0;
+
+ sqlite3_mutex_enter(sqlite3_db_mutex(reader->db));
+ while (batch_size < reader->batch_size) {
+ if (reader->binder) {
+ char finished = 0;
+ struct AdbcError error = {0};
+ AdbcStatusCode status = AdbcSqliteBinderBindNext(reader->binder, reader->db,
+ reader->stmt, &finished, &error);
+ if (status != ADBC_STATUS_OK) {
+ reader->done = 1;
+ status = EIO;
+ if (error.release) {
+ strncpy(reader->error.message, error.message, sizeof(reader->error.message));
+ reader->error.message[sizeof(reader->error.message) - 1] = '\0';
+ error.release(&error);
+ }
+ break;
+ } else if (finished) {
+ reader->done = 1;
+ break;
+ }
+ }
+
+ int rc = sqlite3_step(reader->stmt);
+ if (rc == SQLITE_DONE) {
+ reader->done = 1;
+ break;
+ } else if (rc == SQLITE_ERROR) {
+ reader->done = 1;
+ status = EIO;
+ StatementReaderSetError(reader);
+ break;
+ }
+
+ for (int col = 0; col < reader->schema.n_children; col++) {
+ status = StatementReaderGetOneValue(reader, col, out->children[col]);
+ if (status != 0) break;
+ }
+
+ if (status != 0) break;
+ batch_size++;
+ }
+ if (status == 0) {
+ out->length = batch_size;
+ for (int i = 0; i < reader->schema.n_children; i++) {
+ status = ArrowArrayFinishBuilding(out->children[i], &reader->error);
+ if (status != 0) break;
+ }
+
+ // If we didn't read any rows, the reader is exhausted - don't generate a spurious
+ // batch
+ if (batch_size == 0) out->release(out);
+ }
+
+ sqlite3_mutex_leave(sqlite3_db_mutex(reader->db));
+ return status;
+}
+
+int StatementReaderGetSchema(struct ArrowArrayStream* self, struct ArrowSchema* out) {
+ if (!self->release || !self->private_data) {
+ return EINVAL;
+ }
+
+ struct StatementReader* reader = (struct StatementReader*)self->private_data;
+ return ArrowSchemaDeepCopy(&reader->schema, out);
+}
+
+void StatementReaderRelease(struct ArrowArrayStream* self) {
+ if (self->private_data) {
+ struct StatementReader* reader = (struct StatementReader*)self->private_data;
+ if (reader->schema.release) {
+ reader->schema.release(&reader->schema);
+ }
+ if (reader->initial_batch.release) {
+ reader->initial_batch.release(&reader->initial_batch);
+ }
+ if (reader->types) {
+ free(reader->types);
+ }
+ if (reader->binder) {
+ AdbcSqliteBinderRelease(reader->binder);
+ }
+
+ free(self->private_data);
+ }
+ self->private_data = NULL;
+ self->release = NULL;
+ self->get_last_error = NULL;
+ self->get_next = NULL;
+ self->get_schema = NULL;
+}
+
+// -- Type inferring reader ------------------------------------------
+//
+// Every column starts as INT64. A certain number of rows is read, and
+// the column is upcast if a different type is read. Afterwards,
+// 'compatible' values are upcast (int64 <: double <: string) and
+// 'incompatible' values will raise an error.
+//
+// Notes:
+// - The upcasting may be different when done during the inference
+// stage vs the reading stage, because in the former, we have to do
+// the upcasting, and in the latter, we let SQLite handle it.
+// - We don't use unions since those are a pain to work with.
+//
+// Improvements/to-dos:
+// - SQLITE_BLOB type values are not handled
+// - Columns where no non-NULL value is seen should be typed as STRING
+// for maximum flexibility for later values
+// - Make this more flexible (e.g. choose whether to attempt to cast
+// incompatible values, or insert a null, instead of erroring)
+// - Add the option to try to use SQLite's designated type for a
+// column, instead of inferring the type (where possible).
+
+/// Initialize buffers for the first (type-inferred) batch of data.
+/// Use raw buffers since the types may change.
+AdbcStatusCode StatementReaderInitializeInfer(int num_columns, size_t infer_rows,
+ struct ArrowBitmap* validity,
+ struct ArrowBuffer* data,
+ struct ArrowBuffer* binary,
+ enum ArrowType* current_type,
+ struct AdbcError* error) {
+ for (int i = 0; i < num_columns; i++) {
+ ArrowBitmapInit(&validity[i]);
+ CHECK_NA(INTERNAL, ArrowBitmapReserve(&validity[i], infer_rows), error);
+ ArrowBufferInit(&data[i]);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(&data[i], infer_rows * sizeof(int64_t)), error);
+ memset(&binary[i], 0, sizeof(struct ArrowBuffer));
+ current_type[i] = NANOARROW_TYPE_INT64;
+ }
+ return ADBC_STATUS_OK;
+} // NOLINT(whitespace/indent)
+
+/// Finalize the first (type-inferred) batch of data.
+AdbcStatusCode StatementReaderInferFinalize(
+ sqlite3_stmt* stmt, int num_columns, int64_t num_rows, struct StatementReader* reader,
+ struct ArrowBitmap* validity, struct ArrowBuffer* data, struct ArrowBuffer* binary,
+ enum ArrowType* current_type, struct AdbcError* error) {
+ CHECK_NA(INTERNAL, ArrowSchemaInit(&reader->schema, NANOARROW_TYPE_STRUCT), error);
+ CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(&reader->schema, num_columns), error);
+ for (int col = 0; col < num_columns; col++) {
+ struct ArrowSchema* field = reader->schema.children[col];
+ const char* name = sqlite3_column_name(stmt, col);
+ CHECK_NA(INTERNAL, ArrowSchemaInit(field, current_type[col]), error);
+ CHECK_NA(INTERNAL, ArrowSchemaSetName(field, name), error);
+ }
+ CHECK_NA(INTERNAL,
+ ArrowArrayInitFromSchema(&reader->initial_batch, &reader->schema, NULL),
+ error);
+
+ // Do validation up front, so that we either move 0 buffers or all buffers
+ for (int col = 0; col < num_columns; col++) {
+ if (current_type[col] == NANOARROW_TYPE_STRING ||
+ current_type[col] == NANOARROW_TYPE_BINARY) {
+ if (binary[col].data == NULL) {
+ SetError(error, "INTERNAL: column has binary-like type but no backing buffer");
+ return ADBC_STATUS_INTERNAL;
+ }
+ }
+ reader->initial_batch.children[col]->length = num_rows;
+ }
+
+ reader->initial_batch.length = num_rows;
+
+ for (int col = 0; col < num_columns; col++) {
+ struct ArrowArray* arr = reader->initial_batch.children[col];
+ ArrowArraySetValidityBitmap(arr, &validity[col]);
+ // XXX: ignore return code since we've hardcoded the buffer index
+ (void)ArrowArraySetBuffer(arr, 1, &data[col]);
+ if (current_type[col] == NANOARROW_TYPE_STRING ||
+ current_type[col] == NANOARROW_TYPE_BINARY) {
+ (void)ArrowArraySetBuffer(arr, 2, &binary[col]);
+ }
+ arr->length = num_rows;
+ }
+ return ADBC_STATUS_OK;
+}
+
+// Convert an int64 typed column to double.
+AdbcStatusCode StatementReaderUpcastInt64ToDouble(struct ArrowBuffer* data,
+ struct AdbcError* error) {
+ struct ArrowBuffer doubles;
+ ArrowBufferInit(&doubles);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(&doubles, data->capacity_bytes), error);
+
+ size_t num_elements = data->size_bytes / sizeof(int64_t);
+ const int64_t* elements = (const int64_t*)data->data;
+ for (size_t i = 0; i < num_elements; i++) {
+ double value = elements[i];
+ ArrowBufferAppendUnsafe(&doubles, &value, sizeof(double));
+ }
+ ArrowBufferReset(data);
+ ArrowBufferMove(&doubles, data);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderAppendInt64ToBinary(struct ArrowBuffer* offsets,
+ struct ArrowBuffer* binary,
+ int64_t value, int32_t* offset,
+ struct AdbcError* error) {
+ // Make sure we have at least 21 bytes available (19 digits + sign + null)
+ // Presumably this is enough, but manpage for snprintf makes no guarantees
+ // about whether locale may affect this, so check for truncation regardless
+ static const size_t kReserve = 21;
+ size_t buffer_size = kReserve;
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+ char* output = (char*)(binary->data + binary->size_bytes);
+ int written = 0;
+ while (1) {
+ written = snprintf(output, buffer_size, "%" PRId64, value);
+ if (written >= buffer_size) {
+ // Truncated, resize and try again
+ // Check for overflow - presumably this can never happen...?
+ if (UINT_MAX - buffer_size < buffer_size) {
+ SetError(error, "Overflow when upcasting double to string");
+ return ADBC_STATUS_INTERNAL;
+ }
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+ buffer_size += buffer_size;
+ continue;
+ }
+ break;
+ }
+ *offset += written;
+ binary->size_bytes += written;
+ ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderAppendDoubleToBinary(struct ArrowBuffer* offsets,
+ struct ArrowBuffer* binary,
+ double value, int32_t* offset,
+ struct AdbcError* error) {
+ static const size_t kReserve = 64;
+ size_t buffer_size = kReserve;
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+ char* output = (char*)(binary->data + binary->size_bytes);
+ int written = 0;
+ while (1) {
+ written = snprintf(output, buffer_size, "%e", value);
+ if (written >= buffer_size) {
+ // Truncated, resize and try again
+ // Check for overflow - presumably this can never happen...?
+ if (UINT_MAX - buffer_size < buffer_size) {
+ SetError(error, "Overflow when upcasting double to string");
+ return ADBC_STATUS_INTERNAL;
+ }
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
+ buffer_size += buffer_size;
+ continue;
+ }
+ break;
+ }
+ *offset += written;
+ binary->size_bytes += written;
+ ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderUpcastInt64ToBinary(struct ArrowBuffer* data,
+ struct ArrowBuffer* binary,
+ struct AdbcError* error) {
+ struct ArrowBuffer offsets;
+ ArrowBufferInit(&offsets);
+ ArrowBufferInit(binary);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
+
+ size_t num_elements = data->size_bytes / sizeof(int64_t);
+ const int64_t* elements = (const int64_t*)data->data;
+
+ int32_t offset = 0;
+ ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
+ for (size_t i = 0; i < num_elements; i++) {
+ AdbcStatusCode status =
+ StatementReaderAppendInt64ToBinary(&offsets, binary, elements[i], &offset, error);
+ if (status != ADBC_STATUS_OK) return status;
+ }
+ ArrowBufferReset(data);
+ ArrowBufferMove(&offsets, data);
+ return ADBC_STATUS_OK;
+}
+
+AdbcStatusCode StatementReaderUpcastDoubleToBinary(struct ArrowBuffer* data,
+ struct ArrowBuffer* binary,
+ struct AdbcError* error) {
+ struct ArrowBuffer offsets;
+ ArrowBufferInit(&offsets);
+ ArrowBufferInit(binary);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
+ CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
+
+ size_t num_elements = data->size_bytes / sizeof(double);
+ const double* elements = (const double*)data->data;
+
+ int32_t offset = 0;
+ ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
+ for (size_t i = 0; i < num_elements; i++) {
+ AdbcStatusCode status = StatementReaderAppendDoubleToBinary(
+ &offsets, binary, elements[i], &offset, error);
+ if (status != ADBC_STATUS_OK) return status;
+ }
+ ArrowBufferReset(data);
+ ArrowBufferMove(&offsets, data);
+ return ADBC_STATUS_OK;
+}
+
+/// Append a single value to a single column.
+AdbcStatusCode StatementReaderInferOneValue(
+ sqlite3_stmt* stmt, int col, struct ArrowBitmap* validity, struct ArrowBuffer* data,
+ struct ArrowBuffer* binary, enum ArrowType* current_type, struct AdbcError* error) {
+ // TODO: static_assert sizeof(int64) == sizeof(double)
+
+ int sqlite_type = sqlite3_column_type(stmt, col);
+ switch (sqlite_type) {
+ case SQLITE_NULL: {
+ ArrowBitmapAppendUnsafe(validity, /*set=*/0, /*length=*/1);
+ switch (*current_type) {
+ case NANOARROW_TYPE_INT64: {
+ int64_t value = 0;
+ ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+ break;
+ }
+ case NANOARROW_TYPE_DOUBLE: {
+ double value = 0.0;
+ ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+ break;
+ }
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY: {
+ const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+ CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
+ return ADBC_STATUS_OK;
+ }
+ default:
+ return ADBC_STATUS_INTERNAL;
+ }
+ break;
+ }
+ case SQLITE_INTEGER: {
+ ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+ switch (*current_type) {
+ case NANOARROW_TYPE_INT64: {
+ int64_t value = sqlite3_column_int64(stmt, col);
+ ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+ break;
+ }
+ case NANOARROW_TYPE_DOUBLE: {
+ double value = sqlite3_column_double(stmt, col);
+ ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
+ break;
+ }
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY: {
+ int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+ return StatementReaderAppendInt64ToBinary(
+ data, binary, sqlite3_column_int64(stmt, col), &offset, error);
+ }
+ default:
+ return ADBC_STATUS_INTERNAL;
+ }
+ break;
+ }
+ case SQLITE_FLOAT: {
+ ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+ switch (*current_type) {
+ case NANOARROW_TYPE_INT64: {
+ AdbcStatusCode status = StatementReaderUpcastInt64ToDouble(data, error);
+ if (status != ADBC_STATUS_OK) return status;
+ *current_type = NANOARROW_TYPE_DOUBLE;
+ double value = sqlite3_column_double(stmt, col);
+ ArrowBufferAppendUnsafe(data, &value, sizeof(double));
+ break;
+ }
+ case NANOARROW_TYPE_DOUBLE: {
+ double value = sqlite3_column_double(stmt, col);
+ ArrowBufferAppendUnsafe(data, &value, sizeof(double));
+ break;
+ }
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY: {
+ int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
+ return StatementReaderAppendDoubleToBinary(
+ data, binary, sqlite3_column_double(stmt, col), &offset, error);
+ }
+ default:
+ return ADBC_STATUS_INTERNAL;
+ }
+ break;
+ }
+ case SQLITE_TEXT: {
+ ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
+
+ switch (*current_type) {
+ case NANOARROW_TYPE_INT64: {
+ AdbcStatusCode status = StatementReaderUpcastInt64ToBinary(data, binary, error);
+ if (status != ADBC_STATUS_OK) return status;
+ *current_type = NANOARROW_TYPE_STRING;
+ break;
+ }
+ case NANOARROW_TYPE_DOUBLE: {
+ AdbcStatusCode status =
+ StatementReaderUpcastDoubleToBinary(data, binary, error);
+ if (status != ADBC_STATUS_OK) return status;
+ *current_type = NANOARROW_TYPE_STRING;
+ break;
+ }
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_BINARY:
+ break;
+ default:
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ const unsigned char* value = sqlite3_column_text(stmt, col);
+ const int size = sqlite3_column_bytes(stmt, col);
+ const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1] + size;
+ CHECK_NA(INTERNAL, ArrowBufferAppend(binary, value, size), error);
+ CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
+ break;
+ }
+ case SQLITE_BLOB:
+ default: {
+ return ADBC_STATUS_IO;
+ }
+ }
+ return ADBC_STATUS_OK;
+} // NOLINT(whitespace/indent)
+
+AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
+ struct AdbcSqliteBinder* binder, size_t batch_size,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error) {
+ struct StatementReader* reader = malloc(sizeof(struct StatementReader));
+ memset(reader, 0, sizeof(struct StatementReader));
+ reader->db = db;
+ reader->stmt = stmt;
+ reader->batch_size = batch_size;
+
+ stream->private_data = reader;
+ stream->release = StatementReaderRelease;
+ stream->get_last_error = StatementReaderGetLastError;
+ stream->get_next = StatementReaderGetNext;
+ stream->get_schema = StatementReaderGetSchema;
+
+ sqlite3_mutex_enter(sqlite3_db_mutex(db));
+
+ const int num_columns = sqlite3_column_count(stmt);
+ struct ArrowBitmap* validity = malloc(num_columns * sizeof(struct ArrowBitmap));
+ struct ArrowBuffer* data = malloc(num_columns * sizeof(struct ArrowBuffer));
+ struct ArrowBuffer* binary = malloc(num_columns * sizeof(struct ArrowBuffer));
+ enum ArrowType* current_type = malloc(num_columns * sizeof(enum ArrowType));
+
+ AdbcStatusCode status = StatementReaderInitializeInfer(
+ num_columns, batch_size, validity, data, binary, current_type, error);
+ if (status == ADBC_STATUS_OK) {
+ int64_t num_rows = 0;
+ while (num_rows < batch_size) {
+ if (binder) {
+ char finished = 0;
+ status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
+ if (status != ADBC_STATUS_OK) break;
+ if (finished) {
+ reader->done = 1;
+ break;
+ }
+ }
+
+ int rc = sqlite3_step(stmt);
+ if (rc == SQLITE_DONE) {
+ reader->done = 1;
+ break;
+ } else if (rc == SQLITE_ERROR) {
+ status = ADBC_STATUS_IO;
+ break;
+ }
+
+ for (int col = 0; col < num_columns; col++) {
+ status = StatementReaderInferOneValue(stmt, col, &validity[col], &data[col],
+ &binary[col], ¤t_type[col], error);
+ if (status != ADBC_STATUS_OK) break;
+ }
+ if (status != ADBC_STATUS_OK) break;
+ num_rows++;
+ }
+
+ if (status == ADBC_STATUS_OK) {
+ status = StatementReaderInferFinalize(stmt, num_columns, num_rows, reader, validity,
+ data, binary, current_type, error);
+ }
+ }
+
+ if (status != ADBC_STATUS_OK) {
+ // Free the individual buffers
+ // This is OK, since InferFinalize either moves all buffers or no buffers
+ for (int i = 0; i < num_columns; i++) {
+ ArrowBitmapReset(&validity[i]);
+ ArrowBufferReset(&data[i]);
+ ArrowBufferReset(&binary[i]);
+ }
+ free(current_type);
+ } else {
+ reader->types = current_type;
+ reader->binder = binder;
+ }
+
+ free(data);
+ free(validity);
+ free(binary);
+
+ sqlite3_mutex_leave(sqlite3_db_mutex(db));
+ return status;
+} // NOLINT(whitespace/indent)
diff --git a/c/driver/sqlite/statement_reader.h b/c/driver/sqlite/statement_reader.h
new file mode 100644
index 0000000..3b1c425
--- /dev/null
+++ b/c/driver/sqlite/statement_reader.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Utilities for executing SQLite statements.
+
+#pragma once
+
+#include <adbc.h>
+#include <nanoarrow.h>
+#include <sqlite3.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/// \brief Helper to manage binding data to a SQLite statement.
+struct AdbcSqliteBinder {
+ // State
+ struct ArrowSchema schema;
+ struct ArrowArrayStream params;
+ enum ArrowType* types;
+
+ // Scratch space
+ struct ArrowArray array;
+ struct ArrowArrayView batch;
+ int64_t next_row;
+};
+
+AdbcStatusCode AdbcSqliteBinderSetArray(struct AdbcSqliteBinder* binder,
+ struct ArrowArray* values,
+ struct ArrowSchema* schema,
+ struct AdbcError* error);
+AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct AdbcSqliteBinder* binder,
+ struct ArrowArrayStream* values,
+ struct AdbcError* error);
+AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder, sqlite3* conn,
+ sqlite3_stmt* stmt, char* finished,
+ struct AdbcError* error);
+void AdbcSqliteBinderRelease(struct AdbcSqliteBinder* binder);
+
+/// \brief Initialize an ArrowArrayStream from a sqlite3_stmt.
+/// \param[in] db The SQLite connection.
+/// \param[in] stmt The SQLite statement.
+/// \param[in] binder Query parameters to bind, if provided.
+/// \param[in] infer_rows How many rows to read to infer the Arrow schema.
+/// \param[out] stream The stream to export to.
+/// \param[out] error Error details, if needed.
+AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
+ struct AdbcSqliteBinder* binder, size_t batch_size,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/c/driver/sqlite/types.h b/c/driver/sqlite/types.h
new file mode 100644
index 0000000..cd46f4f
--- /dev/null
+++ b/c/driver/sqlite/types.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <adbc.h>
+#include <sqlite3.h>
+
+#include "statement_reader.h"
+
+struct SqliteDatabase {
+ sqlite3* db;
+ char* uri;
+ size_t connection_count;
+};
+
+struct SqliteConnection {
+ sqlite3* conn;
+ char active_transaction;
+};
+
+struct SqliteStatement {
+ sqlite3* conn;
+
+ // -- Query state -----------------------------------------
+
+ sqlite3_stmt* stmt;
+ char prepared;
+ char* query;
+ size_t query_len;
+
+ // -- Bind state ------------------------------------------
+ struct AdbcSqliteBinder binder;
+
+ // -- Ingest state ----------------------------------------
+ char* target_table;
+ char append;
+
+ // -- Query options ---------------------------------------
+ int batch_size;
+};
diff --git a/c/driver/sqlite/utils.c b/c/driver/sqlite/utils.c
new file mode 100644
index 0000000..f85ec0d
--- /dev/null
+++ b/c/driver/sqlite/utils.c
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "utils.h"
+
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <nanoarrow.h>
+
+static size_t kErrorBufferSize = 256;
+static char kErrorPrefix[] = "[SQLite] ";
+
+void ReleaseError(struct AdbcError* error) {
+ free(error->message);
+ error->message = NULL;
+ error->release = NULL;
+}
+
+void SetError(struct AdbcError* error, const char* format, ...) {
+ if (!error) return;
+ if (error->release) {
+ // TODO: combine the errors if possible
+ error->release(error);
+ }
+ error->message = malloc(kErrorBufferSize);
+ if (!error->message) return;
+
+ error->release = &ReleaseError;
+
+ memcpy(error->message, kErrorPrefix, sizeof(kErrorPrefix));
+
+ va_list args;
+ va_start(args, format);
+ vsnprintf(error->message + sizeof(kErrorPrefix) - 1,
+ kErrorBufferSize - sizeof(kErrorPrefix) + 1, format, args);
+ va_end(args);
+}
+
+struct SingleBatchArrayStream {
+ struct ArrowSchema schema;
+ struct ArrowArray batch;
+};
+const char* SingleBatchArrayStreamGetLastError(struct ArrowArrayStream* stream) {
+ return NULL;
+}
+int SingleBatchArrayStreamGetNext(struct ArrowArrayStream* stream,
+ struct ArrowArray* batch) {
+ if (!stream || !stream->private_data) return EINVAL;
+ struct SingleBatchArrayStream* impl =
+ (struct SingleBatchArrayStream*)stream->private_data;
+
+ memcpy(batch, &impl->batch, sizeof(*batch));
+ memset(&impl->batch, 0, sizeof(*batch));
+ return 0;
+}
+int SingleBatchArrayStreamGetSchema(struct ArrowArrayStream* stream,
+ struct ArrowSchema* schema) {
+ if (!stream || !stream->private_data) return EINVAL;
+ struct SingleBatchArrayStream* impl =
+ (struct SingleBatchArrayStream*)stream->private_data;
+
+ return ArrowSchemaDeepCopy(&impl->schema, schema);
+}
+void SingleBatchArrayStreamRelease(struct ArrowArrayStream* stream) {
+ if (!stream || !stream->private_data) return;
+ struct SingleBatchArrayStream* impl =
+ (struct SingleBatchArrayStream*)stream->private_data;
+ impl->schema.release(&impl->schema);
+ if (impl->batch.release) impl->batch.release(&impl->batch);
+ free(impl);
+
+ memset(stream, 0, sizeof(*stream));
+}
+
+AdbcStatusCode BatchToArrayStream(struct ArrowArray* values, struct ArrowSchema* schema,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error) {
+ if (!values->release) {
+ SetError(error, "ArrowArray is not initialized");
+ return ADBC_STATUS_INTERNAL;
+ } else if (!schema->release) {
+ SetError(error, "ArrowSchema is not initialized");
+ return ADBC_STATUS_INTERNAL;
+ } else if (stream->release) {
+ SetError(error, "ArrowArrayStream is already initialized");
+ return ADBC_STATUS_INTERNAL;
+ }
+
+ struct SingleBatchArrayStream* impl =
+ (struct SingleBatchArrayStream*)malloc(sizeof(*impl));
+ memcpy(&impl->schema, schema, sizeof(*schema));
+ memcpy(&impl->batch, values, sizeof(*values));
+ memset(schema, 0, sizeof(*schema));
+ memset(values, 0, sizeof(*values));
+ stream->private_data = impl;
+ stream->get_last_error = SingleBatchArrayStreamGetLastError;
+ stream->get_next = SingleBatchArrayStreamGetNext;
+ stream->get_schema = SingleBatchArrayStreamGetSchema;
+ stream->release = SingleBatchArrayStreamRelease;
+
+ return ADBC_STATUS_OK;
+}
+
+void StringBuilderInit(struct StringBuilder* builder, size_t initial_size) {
+ builder->buffer = (char*)malloc(initial_size);
+ builder->size = 0;
+ builder->capacity = initial_size;
+}
+void StringBuilderAppend(struct StringBuilder* builder, const char* value) {
+ size_t length = strlen(value);
+ size_t new_size = builder->size + length;
+ if (new_size > builder->capacity) {
+ size_t new_capacity = builder->size + length - builder->capacity;
+ if (builder->size == 0) new_capacity++;
+
+ builder->buffer = realloc(builder->buffer, new_capacity);
+ builder->capacity = new_capacity;
+ }
+ strncpy(builder->buffer + builder->size, value, length);
+ builder->buffer[new_size] = '\0';
+ builder->size = new_size;
+}
+void StringBuilderReset(struct StringBuilder* builder) {
+ if (builder->buffer) {
+ free(builder->buffer);
+ }
+ memset(builder, 0, sizeof(*builder));
+}
diff --git a/c/driver/sqlite/utils.h b/c/driver/sqlite/utils.h
new file mode 100644
index 0000000..5d65d88
--- /dev/null
+++ b/c/driver/sqlite/utils.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <adbc.h>
+
+/// Set error details using a format string.
+void SetError(struct AdbcError* error, const char* format, ...)
+ __attribute__((format(printf, 2, 3)));
+
+/// Wrap a single batch as a stream.
+AdbcStatusCode BatchToArrayStream(struct ArrowArray* values, struct ArrowSchema* schema,
+ struct ArrowArrayStream* stream,
+ struct AdbcError* error);
+
+struct StringBuilder {
+ char* buffer;
+ // Not including null terminator
+ size_t size;
+ size_t capacity;
+};
+void StringBuilderInit(struct StringBuilder* builder, size_t initial_size);
+void StringBuilderAppend(struct StringBuilder* builder, const char* value);
+void StringBuilderReset(struct StringBuilder* builder);
+
+/// Check an NanoArrow status code.
+#define CHECK_NA(CODE, EXPR, ERROR) \
+ do { \
+ ArrowErrorCode arrow_error_code = (EXPR); \
+ if (arrow_error_code != 0) { \
+ SetError(ERROR, "%s failed: (%d) %s\nDetail: %s:%d %s", #EXPR, arrow_error_code, \
+ strerror(arrow_error_code), __FILE__, __LINE__, __FUNCTION__); \
+ return ADBC_STATUS_##CODE; \
+ } \
+ } while (0)
+
+/// Check an NanoArrow status code.
+#define CHECK_NA_DETAIL(CODE, EXPR, NA_ERROR, ERROR) \
+ do { \
+ ArrowErrorCode arrow_error_code = (EXPR); \
+ if (arrow_error_code != 0) { \
+ SetError(ERROR, "%s failed: (%d) %s: %s\nDetail: %s:%d %s", #EXPR, \
+ arrow_error_code, strerror(arrow_error_code), (NA_ERROR)->message, \
+ __FILE__, __LINE__, __FUNCTION__); \
+ return ADBC_STATUS_##CODE; \
+ } \
+ } while (0)
+
+/// Check a generic status.
+#define RAISE(CODE, EXPR, ERRMSG, ERROR) \
+ do { \
+ if (!(EXPR)) { \
+ SetError(ERROR, "%s failed: %s\nDetail: %s:%d %s", #EXPR, ERRMSG, __FILE__, \
+ __LINE__, __FUNCTION__); \
+ return ADBC_STATUS_##CODE; \
+ } \
+ } while (0)
+
+/// Check an NanoArrow status code.
+#define RAISE_NA(EXPR) \
+ do { \
+ ArrowErrorCode arrow_error_code = (EXPR); \
+ if (arrow_error_code != 0) return arrow_error_code; \
+ } while (0)
+
+/// Check an ADBC status code.
+#define RAISE_ADBC(EXPR) \
+ do { \
+ AdbcStatusCode adbc_status_code = (EXPR); \
+ if (adbc_status_code != ADBC_STATUS_OK) return adbc_status_code; \
+ } while (0)
diff --git a/c/driver_manager/adbc_driver_manager.cc b/c/driver_manager/adbc_driver_manager.cc
index b73d935..80b0358 100644
--- a/c/driver_manager/adbc_driver_manager.cc
+++ b/c/driver_manager/adbc_driver_manager.cc
@@ -316,11 +316,17 @@ AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError*
status = database->private_driver->DatabaseSetOption(database, option.first.c_str(),
option.second.c_str(), error);
if (status != ADBC_STATUS_OK) {
+ delete args;
+ // Release the database
+ std::ignore = database->private_driver->DatabaseRelease(database, error);
if (database->private_driver->release) {
database->private_driver->release(database->private_driver, error);
}
delete database->private_driver;
database->private_driver = nullptr;
+ // Should be redundant, but ensure that AdbcDatabaseRelease
+ // below doesn't think that it contains a TempDatabase
+ database->private_data = nullptr;
return status;
}
}
@@ -411,6 +417,7 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
return ADBC_STATUS_INVALID_STATE;
}
TempConnection* args = reinterpret_cast<TempConnection*>(connection->private_data);
+ connection->private_data = nullptr;
std::unordered_map<std::string, std::string> options = std::move(args->options);
delete args;
@@ -714,13 +721,13 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
}
void* load_handle = dlsym(handle, entrypoint);
- init_func = reinterpret_cast<AdbcDriverInitFunc>(load_handle);
- if (!init_func) {
+ if (!load_handle) {
std::string message = "dlsym() failed: ";
message += dlerror();
SetError(error, message);
return ADBC_STATUS_INTERNAL;
}
+ init_func = reinterpret_cast<AdbcDriverInitFunc>(load_handle);
#endif // defined(_WIN32)
diff --git a/c/driver_manager/adbc_driver_manager_test.cc b/c/driver_manager/adbc_driver_manager_test.cc
index 67762fa..23975e3 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -26,19 +26,23 @@
#include "adbc_driver_manager.h"
#include "driver/test_util.h"
#include "validation/adbc_validation.h"
+#include "validation/adbc_validation_util.h"
// Tests of the SQLite example driver, except using the driver manager
namespace adbc {
+using adbc_validation::IsOkStatus;
+
class DriverManager : public ::testing::Test {
public:
void SetUp() override {
std::memset(&driver, 0, sizeof(driver));
std::memset(&error, 0, sizeof(error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcLoadDriver("adbc_driver_sqlite", NULL,
- ADBC_VERSION_1_0_0, &driver, &error));
+ ASSERT_THAT(AdbcLoadDriver("adbc_driver_sqlite", nullptr, ADBC_VERSION_1_0_0, &driver,
+ &error),
+ IsOkStatus(&error));
}
void TearDown() override {
@@ -92,7 +96,7 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
return res;
}
return AdbcDatabaseSetOption(
- database, "filename", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
+ database, "uri", "file:Sqlite_Transactions?mode=memory&cache=shared", error);
}
std::string BindParameter(int index) const override { return "?"; }
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index 0378966..ab4861c 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -45,160 +45,12 @@ namespace {
/// Assertion helpers
-#define ADBCV_CONCAT(a, b) a##b
-#define ADBCV_NAME(a, b) ADBCV_CONCAT(a, b)
-
#define CHECK_OK(EXPR) \
do { \
if (auto adbc_status = (EXPR); adbc_status != ADBC_STATUS_OK) { \
return adbc_status; \
} \
} while (false)
-
-#define CHECK_ERRNO_IMPL(ERRNO, EXPR) \
- if (int ERRNO = (EXPR); ERRNO != 0) { \
- return ERRNO; \
- }
-#define CHECK_ERRNO(EXPR) CHECK_ERRNO_IMPL(ADBCV_NAME(errno_, __COUNTER__), EXPR)
-
-int MakeSchema(struct ArrowSchema* schema,
- const std::vector<std::pair<std::string, ArrowType>>& fields) {
- CHECK_ERRNO(ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT));
- CHECK_ERRNO(ArrowSchemaAllocateChildren(schema, fields.size()));
- size_t i = 0;
- for (const std::pair<std::string, ArrowType>& field : fields) {
- CHECK_ERRNO(ArrowSchemaInit(schema->children[i], field.second));
- CHECK_ERRNO(ArrowSchemaSetName(schema->children[i], field.first.c_str()));
- i++;
- }
- return 0;
-}
-
-template <typename T>
-int MakeArray(struct ArrowArray* parent, struct ArrowArray* array,
- const std::vector<std::optional<T>>& values) {
- for (const auto& v : values) {
- if (v.has_value()) {
- if constexpr (std::is_same<T, int64_t>::value) {
- CHECK_ERRNO(ArrowArrayAppendInt(array, *v));
- } else if constexpr (std::is_same<T, std::string>::value) {
- struct ArrowStringView view;
- view.data = v->c_str();
- view.n_bytes = v->size();
- CHECK_ERRNO(ArrowArrayAppendString(array, view));
- } else {
- static_assert(!sizeof(T), "Not yet implemented");
- return ENOTSUP;
- }
- } else {
- CHECK_ERRNO(ArrowArrayAppendNull(array, 1));
- }
- }
- return 0;
-}
-
-template <typename First>
-int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
- const std::vector<std::optional<First>>& first) {
- return MakeArray<First>(batch, batch->children[i], first);
-}
-
-template <typename First, typename... Rest>
-int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
- const std::vector<std::optional<First>>& first,
- const std::vector<std::optional<Rest>>&... rest) {
- CHECK_ERRNO(MakeArray<First>(batch, batch->children[i], first));
- return MakeBatchImpl(batch, i + 1, error, rest...);
-}
-
-template <typename... T>
-int MakeBatch(struct ArrowArray* batch, struct ArrowError* error,
- const std::vector<std::optional<T>>&... columns) {
- CHECK_ERRNO(ArrowArrayStartAppending(batch));
- CHECK_ERRNO(MakeBatchImpl(batch, 0, error, columns...));
- for (size_t i = 0; i < static_cast<size_t>(batch->n_children); i++) {
- if (batch->length > 0 && batch->children[i]->length != batch->length) {
- ADD_FAILURE() << "Column lengths are inconsistent: column " << i << " has length "
- << batch->children[i]->length;
- return EINVAL;
- }
- batch->length = batch->children[i]->length;
- }
- return ArrowArrayFinishBuilding(batch, error);
-}
-
-template <typename... T>
-int MakeBatch(struct ArrowSchema* schema, struct ArrowArray* batch,
- struct ArrowError* error,
- const std::vector<std::pair<std::string, ArrowType>>& fields,
- const std::vector<std::optional<T>>&... columns) {
- CHECK_ERRNO(MakeSchema(schema, fields));
- CHECK_ERRNO(ArrowArrayInitFromSchema(batch, schema, error));
- return MakeBatch(batch, error, columns...);
-}
-
-class ConstantArrayStream {
- public:
- explicit ConstantArrayStream(struct ArrowSchema* schema,
- std::vector<struct ArrowArray> batches)
- : batches_(std::move(batches)), next_index_(0) {
- schema_ = *schema;
- std::memset(schema, 0, sizeof(*schema));
- }
-
- static const char* GetLastError(struct ArrowArrayStream* stream) { return nullptr; }
-
- static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
- if (!stream || !stream->private_data || !out) return EINVAL;
- auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
- if (self->next_index_ >= self->batches_.size()) {
- out->release = nullptr;
- return 0;
- }
-
- *out = self->batches_[self->next_index_];
- std::memset(&self->batches_[self->next_index_], 0, sizeof(struct ArrowArray));
-
- self->next_index_++;
- return 0;
- }
-
- static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
- if (!stream || !stream->private_data || !out) return EINVAL;
- auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
- return ArrowSchemaDeepCopy(&self->schema_, out);
- }
-
- static void Release(struct ArrowArrayStream* stream) {
- if (!stream->private_data) return;
- auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
-
- self->schema_.release(&self->schema_);
- for (size_t i = 0; i < self->batches_.size(); i++) {
- if (self->batches_[i].release) {
- self->batches_[i].release(&self->batches_[i]);
- }
- }
-
- delete self;
- std::memset(stream, 0, sizeof(*stream));
- }
-
- private:
- struct ArrowSchema schema_;
- std::vector<struct ArrowArray> batches_;
- size_t next_index_;
-};
-
-void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
- std::vector<struct ArrowArray> batches) {
- stream->get_last_error = &ConstantArrayStream::GetLastError;
- stream->get_next = &ConstantArrayStream::GetNext;
- stream->get_schema = &ConstantArrayStream::GetSchema;
- stream->release = &ConstantArrayStream::Release;
- stream->private_data = new ConstantArrayStream(schema, std::move(batches));
-}
-
} // namespace
//------------------------------------------------------------
@@ -342,10 +194,12 @@ void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* erro
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_THAT((MakeBatch<int64_t, std::string>(
- &schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}},
- {42, -42, std::nullopt}, {"foo", std::nullopt, ""})),
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+ {"strings", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt},
+ {"foo", std::nullopt, ""})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
@@ -513,8 +367,8 @@ void CheckGetObjectsSchema(struct ArrowSchema* schema) {
CompareSchema(usage_schema, {
{"fk_catalog", NANOARROW_TYPE_STRING, NULLABLE},
{"fk_db_schema", NANOARROW_TYPE_STRING, NULLABLE},
- {"fk_table", NANOARROW_TYPE_STRING, NULLABLE},
- {"fk_column_name", NANOARROW_TYPE_STRING, NULLABLE},
+ {"fk_table", NANOARROW_TYPE_STRING, NOT_NULL},
+ {"fk_column_name", NANOARROW_TYPE_STRING, NOT_NULL},
}));
}
@@ -912,14 +766,14 @@ void StatementTest::SetUpTest() {
}
void StatementTest::TearDownTest() {
+ if (statement.private_data) {
+ EXPECT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
+ }
+ EXPECT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
+ EXPECT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
if (error.release) {
error.release(&error);
}
- if (statement.private_data) {
- ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
- }
- ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
- ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
}
void StatementTest::TestNewInit() {
@@ -952,9 +806,9 @@ void StatementTest::TestSqlIngestInts() {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(
- MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt}),
+ MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt}),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
@@ -1004,8 +858,8 @@ void StatementTest::TestSqlIngestAppend() {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42}),
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
@@ -1023,9 +877,10 @@ void StatementTest::TestSqlIngestAppend() {
// Now append
// Re-initialize since Bind() should take ownership of data
- ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(
+ MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+ IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
@@ -1094,9 +949,10 @@ void StatementTest::TestSqlIngestErrors() {
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
- ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
- IsOkErrno(&na_error));
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(
+ MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+ IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
@@ -1108,18 +964,20 @@ void StatementTest::TestSqlIngestErrors() {
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_CREATE, &error),
IsOkStatus(&error));
- ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
- IsOkErrno(&na_error));
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(
+ MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+ IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
// ...then try to overwrite it
- ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {-42, std::nullopt}),
- IsOkErrno(&na_error));
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT(
+ MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
+ IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1127,10 +985,11 @@ void StatementTest::TestSqlIngestErrors() {
if (error.release) error.release(&error);
// ...then try to append an incompatible schema
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+ {"coltwo", NANOARROW_TYPE_INT64}}),
+ IsOkErrno());
ASSERT_THAT(
- (MakeBatch<int64_t, int64_t>(
- &schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}, {"coltwo", NANOARROW_TYPE_INT64}}, {}, {})),
+ (MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {}, {})),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
@@ -1149,10 +1008,10 @@ void StatementTest::TestSqlIngestMultipleConnections() {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_THAT(
- (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
@@ -1168,7 +1027,7 @@ void StatementTest::TestSqlIngestMultipleConnections() {
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
{
- struct AdbcConnection connection2;
+ struct AdbcConnection connection2 = {};
ASSERT_THAT(AdbcConnectionNew(&connection2, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2, &database, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection2, &statement, &error), IsOkStatus(&error));
@@ -1331,10 +1190,12 @@ void StatementTest::TestSqlPrepareSelectParams() {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
- ASSERT_THAT((MakeBatch<int64_t, std::string>(
- &schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}},
- {42, -42, std::nullopt}, {"", std::nullopt, "bar"})),
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
+ {"strings", NANOARROW_TYPE_STRING}}),
+ IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt},
+ {"", std::nullopt, "bar"})),
IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
@@ -1386,10 +1247,10 @@ void StatementTest::TestSqlPrepareUpdate() {
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
- ASSERT_THAT(
- (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
@@ -1404,10 +1265,10 @@ void StatementTest::TestSqlPrepareUpdate() {
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
// Bind and execute
- ASSERT_THAT(
- (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1453,8 +1314,7 @@ void StatementTest::TestSqlPrepareUpdateStream() {
IsOkStatus(&error));
struct ArrowError na_error;
- const std::vector<std::pair<std::string, ArrowType>> fields = {
- {"ints", NANOARROW_TYPE_INT64}};
+ const std::vector<SchemaField> fields = {{"ints", NANOARROW_TYPE_INT64}};
// Create table
{
@@ -1464,8 +1324,9 @@ void StatementTest::TestSqlPrepareUpdateStream() {
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
- ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, fields, {})),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {})),
+ IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
@@ -1473,21 +1334,18 @@ void StatementTest::TestSqlPrepareUpdateStream() {
}
// Generate stream
- struct ArrowArrayStream stream;
- struct ArrowSchema schema;
+ Handle<struct ArrowArrayStream> stream;
+ Handle<struct ArrowSchema> schema;
std::vector<struct ArrowArray> batches(2);
- ASSERT_NO_FATAL_FAILURE(MakeSchema(&schema, fields));
- ASSERT_THAT(ArrowArrayInitFromSchema(&batches[0], &schema, &na_error),
+ ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &batches[0], &na_error,
+ {1, 2, std::nullopt, 3})),
IsOkErrno(&na_error));
- ASSERT_THAT((MakeBatch<int64_t>(&batches[0], &na_error, {1, 2, std::nullopt, 3})),
- IsOkErrno(&na_error));
- ASSERT_THAT(ArrowArrayInitFromSchema(&batches[1], &schema, &na_error),
- IsOkErrno(&na_error));
- ASSERT_THAT(MakeBatch<int64_t>(&batches[1], &na_error, {std::nullopt, 3}),
- IsOkErrno(&na_error));
-
- ASSERT_NO_FATAL_FAILURE(MakeStream(&stream, &schema, std::move(batches)));
+ ASSERT_THAT(
+ MakeBatch<int64_t>(&schema.value, &batches[1], &na_error, {std::nullopt, 3}),
+ IsOkErrno(&na_error));
+ MakeStream(&stream.value, &schema.value, std::move(batches));
// Prepare
std::string query =
@@ -1497,7 +1355,8 @@ void StatementTest::TestSqlPrepareUpdateStream() {
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
// Bind and execute
- ASSERT_THAT(AdbcStatementBindStream(&statement, &stream, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBindStream(&statement, &stream.value, &error),
+ IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
@@ -1561,10 +1420,10 @@ void StatementTest::TestSqlPrepareErrorParamCountMismatch() {
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
- ASSERT_THAT(
- (MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
- {{"int64s", NANOARROW_TYPE_INT64}}, {42, -42, std::nullopt})),
- IsOkErrno());
+ ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
+ ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
+ {42, -42, std::nullopt})),
+ IsOkErrno());
ASSERT_THAT(
([&]() -> AdbcStatusCode {
@@ -1859,10 +1718,6 @@ void StatementTest::TestResultInvalidation() {
ASSERT_NO_FATAL_FAILURE(reader1.Next());
}
-#undef ADBCV_CONCAT
-#undef ADBCV_NAME
-#undef CHECK_ERRNO_IMPL
-#undef CHECK_ERRNO
#undef NOT_NULL
#undef NULLABLE
} // namespace adbc_validation
diff --git a/c/validation/adbc_validation_util.cc b/c/validation/adbc_validation_util.cc
index 215b69a..4a17199 100644
--- a/c/validation/adbc_validation_util.cc
+++ b/c/validation/adbc_validation_util.cc
@@ -16,6 +16,7 @@
// under the License.
#include "adbc_validation_util.h"
+#include <adbc.h>
#include "adbc_validation.h"
@@ -117,6 +118,8 @@ bool IsAdbcStatusCode::MatchAndExplain(AdbcStatusCode actual, std::ostream* os)
if (error_->message) *os << "\nError message: " << error_->message;
if (error_->sqlstate[0]) *os << "\nSQLSTATE: " << error_->sqlstate;
if (error_->vendor_code) *os << "\nVendor code: " << error_->vendor_code;
+
+ if (error_->release) error_->release(error_);
}
}
return false;
@@ -140,6 +143,92 @@ void IsAdbcStatusCode::DescribeNegationTo(std::ostream* os) const {
return IsAdbcStatusCode(code, error);
}
+#define CHECK_ERRNO(EXPR) \
+ do { \
+ if (int adbcv_errno = (EXPR); adbcv_errno != 0) { \
+ return adbcv_errno; \
+ } \
+ } while (false);
+
+int MakeSchema(struct ArrowSchema* schema, const std::vector<SchemaField>& fields) {
+ CHECK_ERRNO(ArrowSchemaInit(schema, NANOARROW_TYPE_STRUCT));
+ CHECK_ERRNO(ArrowSchemaAllocateChildren(schema, fields.size()));
+ size_t i = 0;
+ for (const SchemaField& field : fields) {
+ CHECK_ERRNO(ArrowSchemaInit(schema->children[i], field.type));
+ CHECK_ERRNO(ArrowSchemaSetName(schema->children[i], field.name.c_str()));
+ if (!field.nullable) {
+ schema->children[i]->flags &= ~ARROW_FLAG_NULLABLE;
+ }
+ i++;
+ }
+ return 0;
+}
+
+#undef CHECK_ERRNO
+
+class ConstantArrayStream {
+ public:
+ explicit ConstantArrayStream(struct ArrowSchema* schema,
+ std::vector<struct ArrowArray> batches)
+ : batches_(std::move(batches)), next_index_(0) {
+ schema_ = *schema;
+ std::memset(schema, 0, sizeof(*schema));
+ }
+
+ static const char* GetLastError(struct ArrowArrayStream* stream) { return nullptr; }
+
+ static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
+ if (!stream || !stream->private_data || !out) return EINVAL;
+ auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+ if (self->next_index_ >= self->batches_.size()) {
+ out->release = nullptr;
+ return 0;
+ }
+
+ *out = self->batches_[self->next_index_];
+ std::memset(&self->batches_[self->next_index_], 0, sizeof(struct ArrowArray));
+
+ self->next_index_++;
+ return 0;
+ }
+
+ static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
+ if (!stream || !stream->private_data || !out) return EINVAL;
+ auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+ return ArrowSchemaDeepCopy(&self->schema_, out);
+ }
+
+ static void Release(struct ArrowArrayStream* stream) {
+ if (!stream->private_data) return;
+ auto* self = reinterpret_cast<ConstantArrayStream*>(stream->private_data);
+
+ self->schema_.release(&self->schema_);
+ for (size_t i = 0; i < self->batches_.size(); i++) {
+ if (self->batches_[i].release) {
+ self->batches_[i].release(&self->batches_[i]);
+ }
+ }
+
+ delete self;
+ std::memset(stream, 0, sizeof(*stream));
+ }
+
+ private:
+ struct ArrowSchema schema_;
+ std::vector<struct ArrowArray> batches_;
+ size_t next_index_;
+};
+
+void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
+ std::vector<struct ArrowArray> batches) {
+ stream->get_last_error = &ConstantArrayStream::GetLastError;
+ stream->get_next = &ConstantArrayStream::GetNext;
+ stream->get_schema = &ConstantArrayStream::GetSchema;
+ stream->release = &ConstantArrayStream::Release;
+ stream->private_data = new ConstantArrayStream(schema, std::move(batches));
+}
+
void CompareSchema(
struct ArrowSchema* schema,
const std::vector<std::tuple<std::optional<std::string>, ArrowType, bool>>& fields) {
diff --git a/c/validation/adbc_validation_util.h b/c/validation/adbc_validation_util.h
index f4ce5e8..b9a11a3 100644
--- a/c/validation/adbc_validation_util.h
+++ b/c/validation/adbc_validation_util.h
@@ -24,6 +24,7 @@
#include <string>
#include <tuple>
#include <type_traits>
+#include <utility>
#include <vector>
#include <adbc.h>
@@ -189,6 +190,106 @@ struct StreamReader {
}
};
+struct SchemaField {
+ std::string name;
+ ArrowType type = NANOARROW_TYPE_UNINITIALIZED;
+ bool nullable = true;
+
+ SchemaField(std::string name, ArrowType type, bool nullable)
+ : name(std::move(name)), type(type), nullable(nullable) {}
+
+ SchemaField(std::string name, ArrowType type)
+ : SchemaField(std::move(name), type, /*nullable=*/true) {}
+};
+
+/// \brief Make a schema from a vector of (name, type, nullable) tuples.
+int MakeSchema(struct ArrowSchema* schema, const std::vector<SchemaField>& fields);
+
+/// \brief Make an array from a column of C types.
+template <typename T>
+int MakeArray(struct ArrowArray* parent, struct ArrowArray* array,
+ const std::vector<std::optional<T>>& values) {
+ for (const auto& v : values) {
+ if (v.has_value()) {
+ if constexpr (std::is_same<T, int64_t>::value) {
+ if (int errno_res = ArrowArrayAppendInt(array, *v); errno_res != 0) {
+ return errno_res;
+ }
+ } else if constexpr (std::is_same<T, double>::value) {
+ if (int errno_res = ArrowArrayAppendDouble(array, *v); errno_res != 0) {
+ return errno_res;
+ }
+ } else if constexpr (std::is_same<T, std::string>::value) {
+ struct ArrowStringView view;
+ view.data = v->c_str();
+ view.n_bytes = v->size();
+ if (int errno_res = ArrowArrayAppendString(array, view); errno_res != 0) {
+ return errno_res;
+ }
+ } else {
+ static_assert(!sizeof(T), "Not yet implemented");
+ return ENOTSUP;
+ }
+ } else {
+ if (int errno_res = ArrowArrayAppendNull(array, 1); errno_res != 0) {
+ return errno_res;
+ }
+ }
+ }
+ return 0;
+}
+
+template <typename First>
+int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
+ const std::vector<std::optional<First>>& first) {
+ return MakeArray<First>(batch, batch->children[i], first);
+}
+
+template <typename First, typename... Rest>
+int MakeBatchImpl(struct ArrowArray* batch, size_t i, struct ArrowError* error,
+ const std::vector<std::optional<First>>& first,
+ const std::vector<std::optional<Rest>>&... rest) {
+ if (int errno_res = MakeArray<First>(batch, batch->children[i], first);
+ errno_res != 0) {
+ return errno_res;
+ }
+ return MakeBatchImpl(batch, i + 1, error, rest...);
+}
+
+/// \brief Make a batch from columns of C types.
+template <typename... T>
+int MakeBatch(struct ArrowArray* batch, struct ArrowError* error,
+ const std::vector<std::optional<T>>&... columns) {
+ if (int errno_res = ArrowArrayStartAppending(batch); errno_res != 0) {
+ return errno_res;
+ }
+ if (int errno_res = MakeBatchImpl(batch, 0, error, columns...); errno_res != 0) {
+ return errno_res;
+ }
+ for (size_t i = 0; i < static_cast<size_t>(batch->n_children); i++) {
+ if (batch->length > 0 && batch->children[i]->length != batch->length) {
+ ADD_FAILURE() << "Column lengths are inconsistent: column " << i << " has length "
+ << batch->children[i]->length;
+ return EINVAL;
+ }
+ batch->length = batch->children[i]->length;
+ }
+ return ArrowArrayFinishBuilding(batch, error);
+}
+
+template <typename... T>
+int MakeBatch(struct ArrowSchema* schema, struct ArrowArray* batch,
+ struct ArrowError* error, const std::vector<std::optional<T>>&... columns) {
+ if (int errno_res = ArrowArrayInitFromSchema(batch, schema, error); errno_res != 0) {
+ return errno_res;
+ }
+ return MakeBatch(batch, error, columns...);
+}
+
+/// \brief Make a stream from a list of batches.
+void MakeStream(struct ArrowArrayStream* stream, struct ArrowSchema* schema,
+ std::vector<struct ArrowArray> batches);
+
/// \brief Compare an array for equality against a vector of values.
template <typename T>
void CompareArray(struct ArrowArrayView* array,
diff --git a/c/vendor/nanoarrow/nanoarrow.c b/c/vendor/nanoarrow/nanoarrow.c
index d73322d..ce8e70f 100644
--- a/c/vendor/nanoarrow/nanoarrow.c
+++ b/c/vendor/nanoarrow/nanoarrow.c
@@ -622,6 +622,8 @@ int ArrowSchemaDeepCopy(struct ArrowSchema* schema, struct ArrowSchema* schema_o
return result;
}
+ schema_out->flags = schema->flags;
+
result = ArrowSchemaSetName(schema_out, schema->name);
if (result != NANOARROW_OK) {
schema_out->release(schema_out);
@@ -1719,13 +1721,13 @@ static ArrowErrorCode ArrowArraySetStorageType(struct ArrowArray* array,
case NANOARROW_TYPE_FIXED_SIZE_LIST:
case NANOARROW_TYPE_STRUCT:
- case NANOARROW_TYPE_MAP:
case NANOARROW_TYPE_SPARSE_UNION:
array->n_buffers = 1;
break;
case NANOARROW_TYPE_LIST:
case NANOARROW_TYPE_LARGE_LIST:
+ case NANOARROW_TYPE_MAP:
case NANOARROW_TYPE_BOOL:
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_INT8:
@@ -2293,9 +2295,12 @@ ArrowErrorCode ArrowArrayViewSetArray(struct ArrowArrayView* array_view,
}
break;
case NANOARROW_TYPE_LIST:
+ case NANOARROW_TYPE_MAP: {
+ const char* type_name =
+ array_view->storage_type == NANOARROW_TYPE_LIST ? "list" : "map";
if (array->n_children != 1) {
- ArrowErrorSet(error, "Expected 1 child of list array but found %d child arrays",
- (int)array->n_children);
+ ArrowErrorSet(error, "Expected 1 child of %s array but found %d child arrays",
+ type_name, (int)array->n_children);
return EINVAL;
}
@@ -2305,13 +2310,14 @@ ArrowErrorCode ArrowArrayViewSetArray(struct ArrowArrayView* array_view,
if (array->children[0]->length < last_offset) {
ArrowErrorSet(
error,
- "Expected child of list array with length >= %ld but found array with "
+ "Expected child of %s array with length >= %ld but found array with "
"length %ld",
- (long)last_offset, (long)array->children[0]->length);
+ type_name, (long)last_offset, (long)array->children[0]->length);
return EINVAL;
}
}
break;
+ }
case NANOARROW_TYPE_LARGE_LIST:
if (array->n_children != 1) {
ArrowErrorSet(error,
diff --git a/c/vendor/nanoarrow/nanoarrow.h b/c/vendor/nanoarrow/nanoarrow.h
index b53ac44..83e66be 100644
--- a/c/vendor/nanoarrow/nanoarrow.h
+++ b/c/vendor/nanoarrow/nanoarrow.h
@@ -20,7 +20,7 @@
// #define NANOARROW_NAMESPACE YourNamespaceHere
-#define NANOARROW_BUILD_ID "ghaa66afcc5a9faf48fe7062eb2a025d808ccfac5dd"
+#define NANOARROW_BUILD_ID "gha3d9bcca1e1effe1590437e8f8d9a77fef2ac4d59"
#endif
// Licensed to the Apache Software Foundation (ASF) under one
@@ -2171,8 +2171,7 @@ static inline ArrowErrorCode ArrowArrayAppendDouble(struct ArrowArray* array,
NANOARROW_RETURN_NOT_OK(ArrowBufferAppend(data_buffer, &value, sizeof(double)));
break;
case NANOARROW_TYPE_FLOAT:
- _NANOARROW_CHECK_RANGE(value, FLT_MIN, FLT_MAX);
- NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFloat(data_buffer, value));
+ NANOARROW_RETURN_NOT_OK(ArrowBufferAppendFloat(data_buffer, (float)value));
break;
default:
return EINVAL;
@@ -2268,6 +2267,7 @@ static inline ArrowErrorCode ArrowArrayFinishElement(struct ArrowArray* array) {
switch (private_data->storage_type) {
case NANOARROW_TYPE_LIST:
+ case NANOARROW_TYPE_MAP:
child_length = array->children[0]->length;
if (child_length > INT32_MAX) {
return EINVAL;
diff --git a/c/vendor/nanoarrow/nanoarrow.hpp b/c/vendor/nanoarrow/nanoarrow.hpp
new file mode 100644
index 0000000..992253a
--- /dev/null
+++ b/c/vendor/nanoarrow/nanoarrow.hpp
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "nanoarrow.h"
+
+#ifndef NANOARROW_HPP_INCLUDED
+#define NANOARROW_HPP_INCLUDED
+
+/// \defgroup nanoarrow_hpp Nanoarrow C++ Helpers
+///
+/// The utilities provided in this file are intended to support C++ users
+/// of the nanoarrow C library such that C++-style resource allocation
+/// and error handling can be used with nanoarrow data structures.
+/// These utilities are not intended to mirror the nanoarrow C API.
+
+namespace nanoarrow {
+
+namespace internal {
+
+/// \defgroup nanoarrow_hpp-unique_base Base classes for Unique wrappers
+///
+/// @{
+
+static inline void init_pointer(struct ArrowSchema* data) { data->release = nullptr; }
+
+static inline void move_pointer(struct ArrowSchema* src, struct ArrowSchema* dst) {
+ memcpy(dst, src, sizeof(struct ArrowSchema));
+ src->release = nullptr;
+}
+
+static inline void release_pointer(struct ArrowSchema* data) {
+ if (data->release != nullptr) {
+ data->release(data);
+ }
+}
+
+static inline void init_pointer(struct ArrowArray* data) { data->release = nullptr; }
+
+static inline void move_pointer(struct ArrowArray* src, struct ArrowArray* dst) {
+ memcpy(dst, src, sizeof(struct ArrowArray));
+ src->release = nullptr;
+}
+
+static inline void release_pointer(struct ArrowArray* data) {
+ if (data->release != nullptr) {
+ data->release(data);
+ }
+}
+
+static inline void init_pointer(struct ArrowArrayStream* data) {
+ data->release = nullptr;
+}
+
+static inline void move_pointer(struct ArrowArrayStream* src,
+ struct ArrowArrayStream* dst) {
+ memcpy(dst, src, sizeof(struct ArrowArrayStream));
+ src->release = nullptr;
+}
+
+static inline void release_pointer(ArrowArrayStream* data) {
+ if (data->release != nullptr) {
+ data->release(data);
+ }
+}
+
+static inline void init_pointer(struct ArrowBuffer* data) { ArrowBufferInit(data); }
+
+static inline void move_pointer(struct ArrowBuffer* src, struct ArrowBuffer* dst) {
+ ArrowBufferMove(src, dst);
+}
+
+static inline void release_pointer(struct ArrowBuffer* data) { ArrowBufferReset(data); }
+
+static inline void init_pointer(struct ArrowBitmap* data) { ArrowBitmapInit(data); }
+
+static inline void move_pointer(struct ArrowBitmap* src, struct ArrowBitmap* dst) {
+ ArrowBufferMove(&src->buffer, &dst->buffer);
+ dst->size_bits = src->size_bits;
+ src->size_bits = 0;
+}
+
+static inline void release_pointer(struct ArrowBitmap* data) { ArrowBitmapReset(data); }
+
+static inline void init_pointer(struct ArrowArrayView* data) {
+ ArrowArrayViewInit(data, NANOARROW_TYPE_UNINITIALIZED);
+}
+
+static inline void move_pointer(struct ArrowArrayView* src, struct ArrowArrayView* dst) {
+ memcpy(dst, src, sizeof(struct ArrowArrayView));
+ init_pointer(src);
+}
+
+static inline void release_pointer(struct ArrowArrayView* data) {
+ ArrowArrayViewReset(data);
+}
+
+/// \brief A unique_ptr-like base class for stack-allocatable objects
+/// \tparam T The object type
+template <typename T>
+class Unique {
+ public:
+ /// \brief Construct an invalid instance of T holding no resources
+ Unique() { init_pointer(&data_); }
+
+ /// \brief Move and take ownership of data
+ Unique(T* data) { move_pointer(data, &data_); }
+
+ /// \brief Move and take ownership of data wrapped by rhs
+ Unique(Unique&& rhs) : Unique(rhs.get()) {}
+
+ // These objects are not copyable
+ Unique(Unique& rhs) = delete;
+
+ /// \brief Get a pointer to the data owned by this object
+ T* get() noexcept { return &data_; }
+
+ /// \brief Use the pointer operator to access fields of this object
+ T* operator->() { return &data_; }
+
+ /// \brief Call data's release callback if valid
+ void reset() { release_pointer(&data_); }
+
+ /// \brief Call data's release callback if valid and move ownership of the data
+ /// pointed to by data
+ void reset(T* data) {
+ reset();
+ move_pointer(data, &data_);
+ }
+
+ /// \brief Move ownership of this object to the data pointed to by out
+ void move(T* out) { move_pointer(&data_, out); }
+
+ ~Unique() { reset(); }
+
+ protected:
+ T data_;
+};
+
+/// @}
+
+} // namespace internal
+
+/// \defgroup nanoarrow_hpp-unique Unique object wrappers
+///
+/// The Arrow C Data interface, the Arrow C Stream interface, and the
+/// nanoarrow C library use stack-allocatable objects, some of which
+/// require initialization or cleanup.
+///
+/// @{
+
+/// \brief Class wrapping a unique struct ArrowSchema
+using UniqueSchema = internal::Unique<struct ArrowSchema>;
+
+/// \brief Class wrapping a unique struct ArrowArray
+using UniqueArray = internal::Unique<struct ArrowArray>;
+
+/// \brief Class wrapping a unique struct ArrowArrayStream
+using UniqueArrayStream = internal::Unique<struct ArrowArrayStream>;
+
+/// \brief Class wrapping a unique struct ArrowBuffer
+using UniqueBuffer = internal::Unique<struct ArrowBuffer>;
+
+/// \brief Class wrapping a unique struct ArrowBitmap
+using UniqueBitmap = internal::Unique<struct ArrowBitmap>;
+
+/// \brief Class wrapping a unique struct ArrowArrayView
+using UniqueArrayView = internal::Unique<struct ArrowArrayView>;
+
+/// @}
+
+/// \defgroup nanoarrow_hpp-array-stream ArrayStream helpers
+///
+/// These classes provide simple struct ArrowArrayStream implementations that
+/// can be extended to help simplify the process of creating a valid
+/// ArrowArrayStream implementation or used as-is for testing.
+///
+/// @{
+
+/// \brief An empty array stream
+///
+/// This class can be constructed from an enum ArrowType or
+/// struct ArrowSchema and implements a default get_next() method that
+/// always marks the output ArrowArray as released. This class can
+/// be extended with an implementation of get_next() for a custom
+/// source.
+class EmptyArrayStream {
+ public:
+ /// \brief Create an empty UniqueArrayStream from a struct ArrowSchema
+ ///
+ /// This object takes ownership of the schema and marks the source schema
+ /// as released.
+ static UniqueArrayStream MakeUnique(struct ArrowSchema* schema) {
+ UniqueArrayStream stream;
+ (new EmptyArrayStream(schema))->MakeStream(stream.get());
+ return stream;
+ }
+
+ virtual ~EmptyArrayStream() {}
+
+ protected:
+ UniqueSchema schema_;
+ struct ArrowError error_;
+
+ EmptyArrayStream(struct ArrowSchema* schema) : schema_(schema) {
+ error_.message[0] = '\0';
+ }
+
+ void MakeStream(struct ArrowArrayStream* stream) {
+ stream->get_schema = &get_schema_wrapper;
+ stream->get_next = &get_next_wrapper;
+ stream->get_last_error = &get_last_error_wrapper;
+ stream->release = &release_wrapper;
+ stream->private_data = this;
+ }
+
+ virtual int get_schema(struct ArrowSchema* schema) {
+ return ArrowSchemaDeepCopy(schema_.get(), schema);
+ }
+
+ virtual int get_next(struct ArrowArray* array) {
+ array->release = nullptr;
+ return NANOARROW_OK;
+ }
+
+ virtual const char* get_last_error() { return error_.message; }
+
+ private:
+ static int get_schema_wrapper(struct ArrowArrayStream* stream,
+ struct ArrowSchema* schema) {
+ return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_schema(schema);
+ }
+
+ static int get_next_wrapper(struct ArrowArrayStream* stream, struct ArrowArray* array) {
+ return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_next(array);
+ }
+
+ static const char* get_last_error_wrapper(struct ArrowArrayStream* stream) {
+ return reinterpret_cast<EmptyArrayStream*>(stream->private_data)->get_last_error();
+ }
+
+ static void release_wrapper(struct ArrowArrayStream* stream) {
+ delete reinterpret_cast<EmptyArrayStream*>(stream->private_data);
+ }
+};
+
+/// \brief Implementation of an ArrowArrayStream backed by a vector of ArrowArray objects
+class VectorArrayStream : public EmptyArrayStream {
+ public:
+ /// \brief Create a UniqueArrowArrayStream from an existing array
+ ///
+ /// Takes ownership of the schema and the array.
+ static UniqueArrayStream MakeUnique(struct ArrowSchema* schema,
+ struct ArrowArray* array) {
+ std::vector<UniqueArray> arrays;
+ arrays.emplace_back(array);
+ return MakeUnique(schema, std::move(arrays));
+ }
+
+ /// \brief Create a UniqueArrowArrayStream from existing arrays
+ ///
+ /// This object takes ownership of the schema and arrays.
+ static UniqueArrayStream MakeUnique(struct ArrowSchema* schema,
+ std::vector<UniqueArray> arrays) {
+ UniqueArrayStream stream;
+ (new VectorArrayStream(schema, std::move(arrays)))->MakeStream(stream.get());
+ return stream;
+ }
+
+ protected:
+ VectorArrayStream(struct ArrowSchema* schema, std::vector<UniqueArray> arrays)
+ : EmptyArrayStream(schema), offset_(0), arrays_(std::move(arrays)) {}
+
+ int get_next(struct ArrowArray* array) {
+ if (offset_ < arrays_.size()) {
+ arrays_[offset_++].move(array);
+ } else {
+ array->release = nullptr;
+ }
+
+ return NANOARROW_OK;
+ }
+
+ private:
+ std::vector<UniqueArray> arrays_;
+ int64_t offset_;
+};
+
+/// @}
+
+} // namespace nanoarrow
+
+#endif
diff --git a/c/vendor/vendor_nanoarrow.sh b/c/vendor/vendor_nanoarrow.sh
index bc50743..b5d65c7 100755
--- a/c/vendor/vendor_nanoarrow.sh
+++ b/c/vendor/vendor_nanoarrow.sh
@@ -34,6 +34,7 @@ main() {
cp "$SCRATCH/dist/nanoarrow.c" nanoarrow/
cp "$SCRATCH/dist/nanoarrow.h" nanoarrow/
+ cp "$SCRATCH/src/nanoarrow/nanoarrow.hpp" nanoarrow/
}
main "$@"
diff --git a/ci/conda_env_cpp.txt b/ci/conda_env_cpp.txt
index cfdf3ec..d45faeb 100644
--- a/ci/conda_env_cpp.txt
+++ b/ci/conda_env_cpp.txt
@@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.
-arrow-cpp>=8.0.0
cmake
compilers
gmock>=1.10.0
diff --git a/glib/test/test-connection.rb b/glib/test/test-connection.rb
index bc8c488..02c0086 100644
--- a/glib/test/test-connection.rb
+++ b/glib/test/test-connection.rb
@@ -19,7 +19,7 @@ class ConnectionTest < Test::Unit::TestCase
def setup
@database = ADBC::Database.new
@database.set_option("driver", "adbc_driver_sqlite")
- @database.set_option("filename", ":memory:")
+ @database.set_option("uri", ":memory:")
@database.init
end
diff --git a/glib/test/test-statement.rb b/glib/test/test-statement.rb
index 9c0af1d..2a3f11c 100644
--- a/glib/test/test-statement.rb
+++ b/glib/test/test-statement.rb
@@ -19,7 +19,7 @@ class StatementTest < Test::Unit::TestCase
def setup
@database = ADBC::Database.new
@database.set_option("driver", "adbc_driver_sqlite")
- @database.set_option("filename", ":memory:")
+ @database.set_option("uri", ":memory:")
@database.init
@connection = ADBC::Connection.new
@connection.init(@database)
diff --git a/go/adbc/drivermgr/wrapper.go b/go/adbc/drivermgr/wrapper.go
index 264ee44..3b60696 100644
--- a/go/adbc/drivermgr/wrapper.go
+++ b/go/adbc/drivermgr/wrapper.go
@@ -249,7 +249,7 @@ func (s *stmt) ExecuteQuery(context.Context) (array.RecordReader, int64, error)
)
code := adbc.Status(C.AdbcStatementExecuteQuery(s.st, &out, &affected, &err))
if code != adbc.StatusOK {
- return nil, 0, nil
+ return nil, 0, toAdbcError(code, &err)
}
return getRdr(&out), int64(affected), nil
diff --git a/go/adbc/drivermgr/wrapper_sqlite_test.go b/go/adbc/drivermgr/wrapper_sqlite_test.go
index edad49f..ba3b277 100644
--- a/go/adbc/drivermgr/wrapper_sqlite_test.go
+++ b/go/adbc/drivermgr/wrapper_sqlite_test.go
@@ -76,10 +76,7 @@ func (dm *DriverMgrSuite) TestMetadataGetInfo() {
rdr, err := dm.conn.GetInfo(dm.ctx, nil)
dm.NoError(err)
- dm.True(rdr.Next())
dm.True(infoSchema.Equal(rdr.Schema()))
- dm.NotNil(rdr.Record())
- dm.False(rdr.Next())
rdr.Release()
info := []adbc.InfoCode{
@@ -91,12 +88,11 @@ func (dm *DriverMgrSuite) TestMetadataGetInfo() {
rdr, err = dm.conn.GetInfo(dm.ctx, info)
dm.NoError(err)
- dm.True(rdr.Next())
sc := rdr.Schema()
dm.True(infoSchema.Equal(sc))
- dm.EqualValues(4, rdr.Record().NumRows())
- dm.False(rdr.Next())
rdr.Release()
+
+ // TODO(apache/arrow-nanoarrow#76): values are not checked because go fails to import the union values
}
func (dm *DriverMgrSuite) TestSqlExecute() {
@@ -127,12 +123,16 @@ func (dm *DriverMgrSuite) TestSqlExecuteInvalid() {
dm.Require().NoError(err)
defer st.Close()
- err = st.SetSqlQuery(query)
+ dm.Require().NoError(st.SetSqlQuery(query))
+
+ _, _, err = st.ExecuteQuery(dm.ctx)
+ dm.Require().Error(err)
+
var adbcErr *adbc.Error
dm.ErrorAs(err, &adbcErr)
- dm.ErrorContains(adbcErr, "[SQLite3] sqlite3_prepare_v2:")
+ dm.ErrorContains(adbcErr, "[SQLite] Failed to prepare query:")
dm.ErrorContains(adbcErr, "syntax error")
- dm.Equal(adbc.StatusIO, adbcErr.Code)
+ dm.Equal(adbc.StatusInvalidArgument, adbcErr.Code)
}
func (dm *DriverMgrSuite) TestSqlPrepare() {
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index 14b0245..51be26c 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -64,9 +64,9 @@ _KNOWN_INFO_VALUES = {
0: "vendor_name",
1: "vendor_version",
2: "vendor_arrow_version",
- 100: "vendor_name",
- 101: "vendor_version",
- 102: "vendor_arrow_version",
+ 100: "driver_name",
+ 101: "driver_version",
+ 102: "driver_arrow_version",
}
# ----------------------------------------------------------
diff --git a/python/adbc_driver_manager/tests/test_dbapi.py b/python/adbc_driver_manager/tests/test_dbapi.py
index d3d34af..7c23588 100644
--- a/python/adbc_driver_manager/tests/test_dbapi.py
+++ b/python/adbc_driver_manager/tests/test_dbapi.py
@@ -64,11 +64,16 @@ def test_attrs(sqlite):
@pytest.mark.sqlite
def test_info(sqlite):
- assert sqlite.adbc_get_info() == {
- "vendor_arrow_version": "Arrow/C++ 9.0.0",
- "vendor_name": "ADBC C SQLite3",
- "vendor_version": "0.0.1",
+ info = sqlite.adbc_get_info()
+ assert set(info.keys()) == {
+ "driver_arrow_version",
+ "driver_name",
+ "driver_version",
+ "vendor_name",
+ "vendor_version",
}
+ assert info["driver_name"] == "ADBC SQLite Driver"
+ assert info["vendor_name"] == "SQLite"
@pytest.mark.sqlite
@@ -144,7 +149,7 @@ def test_ingest(data, sqlite):
with sqlite.cursor() as cur:
cur.adbc_ingest("bulk_ingest", data())
- with pytest.raises(dbapi.ProgrammingError):
+ with pytest.raises(dbapi.Error):
cur.adbc_ingest("bulk_ingest", data())
cur.adbc_ingest("bulk_ingest", data(), mode="append")
diff --git a/python/adbc_driver_manager/tests/test_lowlevel.py b/python/adbc_driver_manager/tests/test_lowlevel.py
index 03f71fd..c8f9de7 100644
--- a/python/adbc_driver_manager/tests/test_lowlevel.py
+++ b/python/adbc_driver_manager/tests/test_lowlevel.py
@@ -225,9 +225,9 @@ def test_autocommit(sqlite):
# Data should not be readable
with adbc_driver_manager.AdbcStatement(conn) as stmt:
- with pytest.raises(adbc_driver_manager.OperationalError):
+ with pytest.raises(adbc_driver_manager.Error):
stmt.set_sql_query("SELECT * FROM foo")
- stmt.execute()
+ stmt.execute_query()
with adbc_driver_manager.AdbcStatement(conn) as stmt:
stmt.set_options(**{adbc_driver_manager.INGEST_OPTION_TARGET_TABLE: "foo"})
diff --git a/ruby/README.md b/ruby/README.md
index b49c516..cbdf5f4 100644
--- a/ruby/README.md
+++ b/ruby/README.md
@@ -33,7 +33,7 @@ $ gem install red-adbc
require "adbc"
ADBC::Database.open(driver: "adbc_driver_sqlite",
- filename: ":memory:") do |database|
+ uri: ":memory:") do |database|
database.connect do |connection|
puts(connection.query("SELECT 1"))
end
diff --git a/ruby/test/test-connection.rb b/ruby/test/test-connection.rb
index 2b62c6e..e76a897 100644
--- a/ruby/test/test-connection.rb
+++ b/ruby/test/test-connection.rb
@@ -19,7 +19,7 @@ class ConnectionTest < Test::Unit::TestCase
def setup
options = {
driver: "adbc_driver_sqlite",
- filename: ":memory:",
+ uri: ":memory:",
}
ADBC::Database.open(**options) do |database|
connect_options = {}