You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/23 13:51:03 UTC
[arrow-adbc] branch main updated: Add table/column reflection (#18)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new a549499 Add table/column reflection (#18)
a549499 is described below
commit a54949933cefd5ed6be61f97ba25bbef59898dbe
Author: David Li <li...@gmail.com>
AuthorDate: Thu Jun 23 09:50:57 2022 -0400
Add table/column reflection (#18)
Fixes #14.
* Implement proposed metadata API
* Implement proposed API for constraint metadata
* Remove redundant metadata methods
---
adbc.h | 213 +++++---
adbc_driver_manager/adbc_driver_manager.cc | 20 +
drivers/flight_sql/flight_sql.cc | 17 -
drivers/sqlite/sqlite.cc | 802 ++++++++++++++++++++++++-----
drivers/sqlite/sqlite_test.cc | 395 +++++++++++++-
5 files changed, 1215 insertions(+), 232 deletions(-)
diff --git a/adbc.h b/adbc.h
index a1a3bda..4be0d78 100644
--- a/adbc.h
+++ b/adbc.h
@@ -117,6 +117,8 @@ struct ArrowArrayStream {
#define ADBC
// Storage class macros for Windows
+// Allow overriding/aliasing with application-defined macros
+#if !defined(ADBC_EXPORT)
#if defined(_WIN32)
#if defined(ADBC_EXPORTING)
#define ADBC_EXPORT __declspec(dllexport)
@@ -126,6 +128,7 @@ struct ArrowArrayStream {
#else
#define ADBC_EXPORT
#endif // defined(_WIN32)
+#endif // !defined(ADBC_EXPORT)
/// \file ADBC: Arrow DataBase connectivity (client API)
///
@@ -201,7 +204,7 @@ struct ADBC_EXPORT AdbcDatabase {
void* private_data;
/// \brief The associated driver (used by the driver manager to help
/// track state).
- AdbcDriver* private_driver;
+ struct AdbcDriver* private_driver;
};
/// \brief Allocate a new (but uninitialized) database.
@@ -233,17 +236,6 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
/// \defgroup adbc-connection Connection establishment.
/// @{
-/// \brief A set of connection options.
-struct ADBC_EXPORT AdbcConnectionOptions {
- /// \brief The database to connect to.
- struct AdbcDatabase* database;
-
- /// \brief A driver-specific connection string.
- ///
- /// Should be in ODBC-style format ("Key1=Value1;Key2=Value2").
- const char* target;
-};
-
/// \brief An active database connection.
///
/// Provides methods for query execution, managing prepared
@@ -257,7 +249,7 @@ struct ADBC_EXPORT AdbcConnection {
void* private_data;
/// \brief The associated driver (used by the driver manager to help
/// track state).
- AdbcDriver* private_driver;
+ struct AdbcDriver* private_driver;
};
/// \brief Allocate a new (but uninitialized) connection.
@@ -317,48 +309,143 @@ AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* con
/// schema given in the function docstring. Schema fields are nullable unless
/// otherwise marked.
///
-/// Some functions accept a "search pattern" argument, which is a string that
+/// Some functions accept "search pattern" arguments, which are strings that
/// can contain the special character "%" to match zero or more characters, or
-/// "_" to match exactly one character. (See the documentation of
+/// "_" to match exactly one character. (See the documentation of
/// DatabaseMetaData in JDBC or "Pattern Value Arguments" in the ODBC
-/// documentation.)
-///
-/// TODO: escaping in search patterns?
+/// documentation.) Escaping is not currently supported.
///
/// @{
-/// \brief Get a list of catalogs in the database.
+/// \brief Get a hierarchical view of all catalogs, database schemas,
+/// tables, and columns.
///
/// The result is an Arrow dataset with the following schema:
///
-/// Field Name | Field Type
-/// ---------------|--------------
-/// catalog_name | utf8 not null
+/// Field Name | Field Type
+/// -------------------------|-----------------------
+/// catalog_name | utf8
+/// catalog_db_schemas | list<DB_SCHEMA_SCHEMA>
+///
+/// DB_SCHEMA_SCHEMA is a Struct with fields:
+///
+/// Field Name | Field Type
+/// -------------------------|-----------------------
+/// db_schema_name | utf8
+/// db_schema_tables | list<TABLE_SCHEMA>
+///
+/// TABLE_SCHEMA is a Struct with fields:
+///
+/// Field Name | Field Type
+/// -------------------------|-----------------------
+/// table_name | utf8 not null
+/// table_type | utf8 not null
+/// table_columns | list<COLUMN_SCHEMA>
+/// table_constraints | list<CONSTRAINT_SCHEMA>
+///
+/// COLUMN_SCHEMA is a Struct with fields:
+///
+/// Field Name | Field Type | Comments
+/// -------------------------|-----------------------|---------
+/// column_name | utf8 not null |
+/// ordinal_position | int32 | (1)
+/// remarks | utf8 | (2)
+/// xdbc_data_type | int16 | (3)
+/// xdbc_type_name | utf8 | (3)
+/// xdbc_column_size | int32 | (3)
+/// xdbc_decimal_digits | int16 | (3)
+/// xdbc_num_prec_radix | int16 | (3)
+/// xdbc_nullable | int16 | (3)
+/// xdbc_column_def | utf8 | (3)
+/// xdbc_sql_data_type | int16 | (3)
+/// xdbc_datetime_sub | int16 | (3)
+/// xdbc_char_octet_length | int32 | (3)
+/// xdbc_is_nullable | utf8 | (3)
+/// xdbc_scope_catalog | utf8 | (3)
+/// xdbc_scope_schema | utf8 | (3)
+/// xdbc_scope_table | utf8 | (3)
+/// xdbc_is_autoincrement | bool | (3)
+/// xdbc_is_generatedcolumn | bool | (3)
+///
+/// 1. The column's ordinal position in the table (starting from 1).
+/// 2. Database-specific description of the column.
+/// 3. Optional, JDBC/ODBC-compatible value.
+///
+/// CONSTRAINT_SCHEMA is a Struct with fields:
+///
+/// Field Name | Field Type | Comments
+/// -------------------------|-----------------------|---------
+/// constraint_name | utf8 |
+/// constraint_type | utf8 not null | (1)
+/// constraint_column_names | list<utf8> not null | (2)
+/// constraint_column_usage | list<USAGE_SCHEMA> | (3)
+///
+/// 1. One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
+/// 2. The columns on the current table that are constrained, in
+/// order.
+/// 3. For FOREIGN KEY only, the referenced table and columns.
+///
+/// USAGE_SCHEMA is a Struct with fields:
+///
+/// Field Name | Field Type | Comments
+/// -------------------------|-----------------------|---------
+/// fk_catalog | utf8 |
+/// fk_db_schema | utf8 |
+/// fk_table | utf8 not null |
+/// fk_column_name | utf8 not null |
///
/// \param[in] connection The database connection.
-/// \param[out] statement The result set.
+/// \param[in] depth The level of nesting to display. If 0, display
+/// all levels. If 1, display only catalogs (i.e. catalog_schemas
+/// will be null). If 2, display only catalogs and schemas
+/// (i.e. db_schema_tables will be null), and so on.
+/// \param[in] catalog Only show tables in the given catalog. If NULL,
+/// do not filter by catalog. If an empty string, only show tables
+/// without a catalog. May be a search pattern (see section
+/// documentation).
+/// \param[in] db_schema Only show tables in the given database schema. If
+/// NULL, do not filter by database schema. If an empty string, only show
+/// tables without a database schema. May be a search pattern (see section
+/// documentation).
+/// \param[in] table_name Only show tables with the given name. If NULL, do not
+/// filter by name. May be a search pattern (see section documentation).
+/// \param[in] table_type Only show tables matching one of the given table
+/// types. If NULL, show tables of any type. Valid table types can be fetched
+/// from GetTableTypes. Terminate the list with a NULL entry.
+/// \param[in] column_name Only show columns with the given name. If
+/// NULL, do not filter by name. May be a search pattern (see
+/// section documentation).
+/// \param[out] statement The result set. AdbcStatementGetStream can
+/// be called immediately; do not call Execute or Prepare.
/// \param[out] error Error details, if an error occurs.
ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetCatalogs(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error);
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+ const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_type,
+ const char* column_name,
+ struct AdbcStatement* statement,
+ struct AdbcError* error);
-/// \brief Get a list of schemas in the database.
-///
-/// The result is an Arrow dataset with the following schema:
-///
-/// Field Name | Field Type
-/// ---------------|--------------
-/// catalog_name | utf8
-/// db_schema_name | utf8 not null
+#define ADBC_OBJECT_DEPTH_ALL 0
+#define ADBC_OBJECT_DEPTH_CATALOGS 1
+#define ADBC_OBJECT_DEPTH_DB_SCHEMAS 2
+#define ADBC_OBJECT_DEPTH_TABLES 3
+#define ADBC_OBJECT_DEPTH_COLUMNS ADBC_OBJECT_DEPTH_ALL
+
+/// \brief Get the Arrow schema of a table.
///
/// \param[in] connection The database connection.
-/// \param[out] statement The result set.
+/// \param[in] catalog The catalog (or nullptr if not applicable).
+/// \param[in] db_schema The database schema (or nullptr if not applicable).
+/// \param[in] table_name The table name.
+/// \param[out] schema The table schema.
/// \param[out] error Error details, if an error occurs.
ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetDbSchemas(struct AdbcConnection* connection,
- struct AdbcStatement* statement,
- struct AdbcError* error);
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+ const char* catalog, const char* db_schema,
+ const char* table_name,
+ struct ArrowSchema* schema,
+ struct AdbcError* error);
/// \brief Get a list of table types in the database.
///
@@ -376,38 +463,6 @@ AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error);
-/// \brief Get a list of tables matching the given criteria.
-///
-/// The result is an Arrow dataset with the following schema:
-///
-/// Field Name | Field Type
-/// ---------------|--------------
-/// catalog_name | utf8
-/// db_schema_name | utf8
-/// table_name | utf8 not null
-/// table_type | utf8 not null
-///
-/// \param[in] connection The database connection.
-/// \param[in] catalog Only show tables in the given catalog. If NULL, do not
-/// filter by catalog. If an empty string, only show tables without a
-/// catalog.
-/// \param[in] db_schema Only show tables in the given database schema. If
-/// NULL, do not filter by database schema. If an empty string, only show
-/// tables without a database schema. May be a search pattern (see section
-/// documentation).
-/// \param[in] table_name Only show tables with the given name. If NULL, do not
-/// filter by name. May be a search pattern (see section documentation).
-/// \param[in] table_types Only show tables matching one of the given table
-/// types. If NULL, show tables of any type. Valid table types can be fetched
-/// from get_table_types.
-/// \param[out] statement The result set.
-/// \param[out] error Error details, if an error occurs.
-ADBC_EXPORT
-AdbcStatusCode AdbcConnectionGetTables(struct AdbcConnection* connection,
- const char* catalog, const char* db_schema,
- const char* table_name, const char** table_types,
- struct AdbcStatement* statement,
- struct AdbcError* error);
/// }@
/// }@
@@ -433,7 +488,7 @@ struct ADBC_EXPORT AdbcStatement {
/// \brief The associated driver (used by the driver manager to help
/// track state).
- AdbcDriver* private_driver;
+ struct AdbcDriver* private_driver;
};
/// \brief Create a new statement for a given connection.
@@ -670,15 +725,15 @@ struct ADBC_EXPORT AdbcDriver {
struct AdbcStatement*,
struct AdbcError*);
- AdbcStatusCode (*ConnectionGetCatalogs)(struct AdbcConnection*, struct AdbcStatement*,
- struct AdbcError*);
- AdbcStatusCode (*ConnectionGetDbSchemas)(struct AdbcConnection*, struct AdbcStatement*,
- struct AdbcError*);
+ AdbcStatusCode (*ConnectionGetObjects)(struct AdbcConnection*, int, const char*,
+ const char*, const char*, const char**,
+ const char*, struct AdbcStatement*,
+ struct AdbcError*);
+ AdbcStatusCode (*ConnectionGetTableSchema)(struct AdbcConnection*, const char*,
+ const char*, const char*,
+ struct ArrowSchema*, struct AdbcError*);
AdbcStatusCode (*ConnectionGetTableTypes)(struct AdbcConnection*, struct AdbcStatement*,
struct AdbcError*);
- AdbcStatusCode (*ConnectionGetTables)(struct AdbcConnection*, const char*, const char*,
- const char*, const char**, struct AdbcStatement*,
- struct AdbcError*);
AdbcStatusCode (*StatementNew)(struct AdbcConnection*, struct AdbcStatement*,
struct AdbcError*);
@@ -723,7 +778,7 @@ typedef AdbcStatusCode (*AdbcDriverInitFunc)(size_t count, struct AdbcDriver* dr
// struct/entrypoint instead?
// For use with count
-#define ADBC_VERSION_0_0_1 25
+#define ADBC_VERSION_0_0_1 28
/// }@
diff --git a/adbc_driver_manager/adbc_driver_manager.cc b/adbc_driver_manager/adbc_driver_manager.cc
index c273fb4..0572300 100644
--- a/adbc_driver_manager/adbc_driver_manager.cc
+++ b/adbc_driver_manager/adbc_driver_manager.cc
@@ -61,6 +61,22 @@ void SetError(struct AdbcError* error, const std::string& message) {
}
// Default stubs
+AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection*, struct AdbcStatement*,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode ConnectionGetObjects(struct AdbcConnection*, int, const char*, const char*,
+ const char*, const char**, const char*,
+ struct AdbcStatement*, struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
+AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, const char*,
+ const char*, struct ArrowSchema*,
+ struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
AdbcStatusCode StatementBind(struct AdbcStatement*, struct ArrowArray*,
struct ArrowSchema*, struct AdbcError* error) {
@@ -519,6 +535,10 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
CHECK_REQUIRED(driver, DatabaseInit);
CHECK_REQUIRED(driver, DatabaseRelease);
+ FILL_DEFAULT(driver, ConnectionGetObjects);
+ FILL_DEFAULT(driver, ConnectionGetTableSchema);
+ FILL_DEFAULT(driver, ConnectionGetTableTypes);
+
FILL_DEFAULT(driver, StatementBind);
FILL_DEFAULT(driver, StatementExecute);
FILL_DEFAULT(driver, StatementPrepare);
diff --git a/drivers/flight_sql/flight_sql.cc b/drivers/flight_sql/flight_sql.cc
index c090ec0..e54f979 100644
--- a/drivers/flight_sql/flight_sql.cc
+++ b/drivers/flight_sql/flight_sql.cc
@@ -499,29 +499,24 @@ AdbcStatusCode FlightSqlStatementSetSqlQuery(struct AdbcStatement* statement,
} // namespace
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
return FlightSqlDatabaseInit(database, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
return FlightSqlDatabaseNew(database, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
return FlightSqlDatabaseSetOption(database, key, value, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
return FlightSqlDatabaseRelease(database, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
@@ -531,79 +526,67 @@ AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* con
serialized_length, statement, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
return FlightSqlConnectionGetTableTypes(connection, statement, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
struct AdbcError* error) {
return FlightSqlConnectionInit(connection, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
struct AdbcConnection* connection,
struct AdbcError* error) {
return FlightSqlConnectionNew(database, connection, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
const char* value, struct AdbcError* error) {
return FlightSqlConnectionSetOption(connection, key, value, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return FlightSqlConnectionRelease(connection, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementExecute(struct AdbcStatement* statement,
struct AdbcError* error) {
return FlightSqlStatementExecute(statement, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementGetPartitionDesc(struct AdbcStatement* statement,
uint8_t* partition_desc,
struct AdbcError* error) {
return FlightSqlStatementGetPartitionDesc(statement, partition_desc, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementGetPartitionDescSize(struct AdbcStatement* statement,
size_t* length,
struct AdbcError* error) {
return FlightSqlStatementGetPartitionDescSize(statement, length, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementGetStream(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return FlightSqlStatementGetStream(statement, out, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
return FlightSqlStatementNew(connection, statement, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
struct AdbcError* error) {
return FlightSqlStatementRelease(statement, error);
}
-ADBC_DRIVER_EXPORT
AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
return FlightSqlStatementSetSqlQuery(statement, query, error);
diff --git a/drivers/sqlite/sqlite.cc b/drivers/sqlite/sqlite.cc
index 2973794..74f6cd2 100644
--- a/drivers/sqlite/sqlite.cc
+++ b/drivers/sqlite/sqlite.cc
@@ -22,6 +22,7 @@
#include <mutex>
#include <string>
#include <unordered_map>
+#include <unordered_set>
#include <arrow/builder.h>
#include <arrow/c/bridge.h>
@@ -76,6 +77,15 @@ void SetError(struct AdbcError* error, Args&&... args) {
error->release = ReleaseError;
}
+AdbcStatusCode CheckRc(sqlite3* db, int rc, const char* context,
+ struct AdbcError* error) {
+ if (rc != SQLITE_OK) {
+ SetError(db, context, error);
+ return ADBC_STATUS_IO;
+ }
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* context,
struct AdbcError* error) {
if (rc != SQLITE_OK) {
@@ -89,6 +99,29 @@ AdbcStatusCode CheckRc(sqlite3* db, sqlite3_stmt* stmt, int rc, const char* cont
return ADBC_STATUS_OK;
}
+template <typename CallbackFn>
+AdbcStatusCode DoQuery(sqlite3* db, const char* query, struct AdbcError* error,
+ CallbackFn&& callback) {
+ sqlite3_stmt* stmt;
+ int rc = sqlite3_prepare_v2(db, query, std::strlen(query), &stmt, /*pzTail=*/nullptr);
+ auto status = std::move(callback)(stmt);
+ std::ignore = CheckRc(db, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error);
+ return status;
+}
+
+arrow::Status ToArrowStatus(AdbcStatusCode code, struct AdbcError* error) {
+ if (code == ADBC_STATUS_OK) return Status::OK();
+ // TODO:
+ return Status::UnknownError(code);
+}
+
+AdbcStatusCode FromArrowStatus(const Status& status, struct AdbcError* error) {
+ if (status.ok()) return ADBC_STATUS_OK;
+ SetError(error, status);
+ // TODO: map Arrow codes to ADBC codes
+ return ADBC_STATUS_INTERNAL;
+}
+
std::shared_ptr<arrow::Schema> StatementToSchema(sqlite3_stmt* stmt) {
const int num_columns = sqlite3_column_count(stmt);
arrow::FieldVector fields(num_columns);
@@ -192,12 +225,41 @@ class SqliteConnectionImpl {
explicit SqliteConnectionImpl(std::shared_ptr<SqliteDatabaseImpl> database)
: database_(std::move(database)), db_(nullptr) {}
- //----------------------------------------------------------
- // Common Functions
- //----------------------------------------------------------
-
sqlite3* db() const { return db_; }
+ AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
+ const char* table_name, struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ if ((catalog && std::strlen(catalog) > 0) ||
+ (db_schema && std::strlen(db_schema) > 0)) {
+ std::memset(schema, 0, sizeof(*schema));
+ SetError(error, "Catalog/schema are not supported");
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ std::string query = "SELECT * FROM ";
+ char* escaped = sqlite3_mprintf("%w", table_name);
+ if (!escaped) {
+ // Failed to allocate
+ SetError(error, "Could not escape table name (failed to allocate memory)");
+ return ADBC_STATUS_INTERNAL;
+ }
+ query += escaped;
+
+ sqlite3_stmt* stmt;
+ int rc = sqlite3_prepare_v2(db_, query.c_str(), static_cast<int>(query.size()), &stmt,
+ /*pzTail=*/nullptr);
+ ADBC_RETURN_NOT_OK(CheckRc(db_, stmt, rc, "sqlite3_prepare_v2", error));
+ rc = sqlite3_step(stmt);
+ if (rc == SQLITE_ERROR) {
+ return CheckRc(db_, stmt, rc, "sqlite3_step", error);
+ }
+ auto arrow_schema = StatementToSchema(stmt);
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db_, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error));
+ return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
+ }
+
AdbcStatusCode Init(struct AdbcError* error) {
db_ = database_->Connect();
if (!db_) {
@@ -214,18 +276,80 @@ class SqliteConnectionImpl {
sqlite3* db_;
};
-class SqliteStatementImpl : public arrow::RecordBatchReader {
+AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
+ int64_t row, int* rc, struct AdbcError* error) {
+ int col_index = 1;
+ for (const auto& column : data.columns()) {
+ if (column->IsNull(row)) {
+ *rc = sqlite3_bind_null(stmt, col_index);
+ } else {
+ switch (column->type()->id()) {
+ case arrow::Type::INT64: {
+ *rc = sqlite3_bind_int64(
+ stmt, col_index, static_cast<const arrow::Int64Array&>(*column).Value(row));
+ break;
+ }
+ case arrow::Type::STRING: {
+ const auto& strings = static_cast<const arrow::StringArray&>(*column);
+ *rc =
+ sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
+ strings.value_length(row), SQLITE_STATIC, SQLITE_UTF8);
+ break;
+ }
+ default:
+ SetError(error, "Binding parameter of type ", *column->type());
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ }
+ if (*rc != SQLITE_OK) return ADBC_STATUS_IO;
+ col_index++;
+ }
+ return ADBC_STATUS_OK;
+}
+
+class SqliteStatementReader : public arrow::RecordBatchReader {
public:
- SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
+ explicit SqliteStatementReader(
+ std::shared_ptr<SqliteConnectionImpl> connection, sqlite3_stmt* stmt,
+ std::shared_ptr<arrow::RecordBatchReader> bind_parameters)
: connection_(std::move(connection)),
- stmt_(nullptr),
+ stmt_(stmt),
+ bind_parameters_(std::move(bind_parameters)),
schema_(nullptr),
+ next_parameters_(nullptr),
bind_index_(0),
done_(false) {}
- //----------------------------------------------------------
- // arrow::RecordBatchReader
- //----------------------------------------------------------
+ AdbcStatusCode Init(struct AdbcError* error) {
+ // Step the statement and get the schema (SQLite doesn't
+ // necessarily know the schema until it begins to execute it)
+
+ sqlite3* db = connection_->db();
+ Status status;
+ int rc = SQLITE_OK;
+ if (bind_parameters_) {
+ status = bind_parameters_->ReadNext(&next_parameters_);
+ ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+ ADBC_RETURN_NOT_OK(BindNext(&rc, error));
+ ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_bind", error));
+ }
+ // XXX: with parameters, inferring the schema from the first
+ // argument is inaccurate (what if one is null?). Is there a way
+ // to hint to SQLite the real type?
+
+ rc = sqlite3_step(stmt_);
+ if (rc == SQLITE_ERROR) {
+ return CheckRc(db, stmt_, rc, "sqlite3_step", error);
+ }
+ schema_ = StatementToSchema(stmt_);
+ done_ = rc != SQLITE_ROW;
+ return ADBC_STATUS_OK;
+ }
+
+ void OverrideSchema(std::shared_ptr<arrow::Schema> schema) {
+ // TODO(ARROW-14705): use UnifySchemas for some sanity checking
+ schema_ = std::move(schema);
+ }
std::shared_ptr<arrow::Schema> schema() const override {
DCHECK(schema_);
@@ -255,6 +379,7 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
const auto& field = schema_->field(col);
switch (field->type()->id()) {
case arrow::Type::INT64: {
+ // TODO: handle null values
const sqlite3_int64 value = sqlite3_column_int64(stmt_, col);
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::Int64Builder*>(builders[col].get())->Append(value));
@@ -264,6 +389,7 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
const char* value =
reinterpret_cast<const char*>(sqlite3_column_text(stmt_, col));
if (!value) {
+ // TODO: check field nullability
ARROW_RETURN_NOT_OK(
dynamic_cast<arrow::StringBuilder*>(builders[col].get())->AppendNull());
} else {
@@ -295,7 +421,8 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
if (status != SQLITE_OK) {
return Status::IOError("[SQLite3] sqlite3_reset: ", sqlite3_errmsg(db));
}
- RETURN_NOT_OK(BindNext());
+ struct AdbcError error;
+ ARROW_RETURN_NOT_OK(ToArrowStatus(BindNext(&status, &error), &error));
status = sqlite3_step(stmt_);
if (status == SQLITE_ROW) continue;
} else {
@@ -315,27 +442,39 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
return Status::OK();
}
- //----------------------------------------------------------
- // Common Functions
- //----------------------------------------------------------
+ private:
+ AdbcStatusCode BindNext(int* rc, struct AdbcError* error) {
+ if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
+ return ADBC_STATUS_OK;
+ }
+ return BindParameters(stmt_, *next_parameters_, bind_index_++, rc, error);
+ }
+
+ std::shared_ptr<SqliteConnectionImpl> connection_;
+ sqlite3_stmt* stmt_;
+ std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
+
+ std::shared_ptr<arrow::Schema> schema_;
+ std::shared_ptr<arrow::RecordBatch> next_parameters_;
+ int64_t bind_index_;
+ bool done_;
+};
+
+class SqliteStatementImpl {
+ public:
+ explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
+ : connection_(std::move(connection)), stmt_(nullptr) {}
AdbcStatusCode Close(struct AdbcError* error) {
if (stmt_) {
const int rc = sqlite3_finalize(stmt_);
stmt_ = nullptr;
- next_parameters_.reset();
- bind_parameters_.reset();
ADBC_RETURN_NOT_OK(
CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
- connection_.reset();
}
return ADBC_STATUS_OK;
}
- //----------------------------------------------------------
- // Statement Functions
- //----------------------------------------------------------
-
AdbcStatusCode Bind(const std::shared_ptr<SqliteStatementImpl>& self,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -369,23 +508,427 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
AdbcStatusCode Execute(const std::shared_ptr<SqliteStatementImpl>& self,
struct AdbcError* error) {
- if (bulk_table_.empty()) {
+ if (stmt_) {
return ExecutePrepared(error);
+ } else if (!bulk_table_.empty()) {
+ return ExecuteBulk(error);
}
- return ExecuteBulk(error);
+ SetError(error, "Cannot execute a statement without a query");
+ return ADBC_STATUS_UNINITIALIZED;
+ }
+
+ AdbcStatusCode GetObjects(const std::shared_ptr<SqliteStatementImpl>& self, int depth,
+ const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_type,
+ const char* column_name, struct AdbcError* error) {
+ static std::shared_ptr<arrow::DataType> kColumnSchema = arrow::struct_({
+ arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
+ arrow::field("ordinal_position", arrow::int32()),
+ arrow::field("remarks", arrow::utf8()),
+ arrow::field("xdbc_data_type", arrow::int16()),
+ arrow::field("xdbc_type_name", arrow::utf8()),
+ arrow::field("xdbc_column_size", arrow::int32()),
+ arrow::field("xdbc_decimal_digits", arrow::int16()),
+ arrow::field("xdbc_num_prec_radix", arrow::int16()),
+ arrow::field("xdbc_nullable", arrow::int16()),
+ arrow::field("xdbc_column_def", arrow::utf8()),
+ arrow::field("xdbc_sql_data_type", arrow::int16()),
+ arrow::field("xdbc_datetime_sub", arrow::int16()),
+ arrow::field("xdbc_char_octet_length", arrow::int32()),
+ arrow::field("xdbc_is_nullable", arrow::utf8()),
+ arrow::field("xdbc_scope_catalog", arrow::utf8()),
+ arrow::field("xdbc_scope_schema", arrow::utf8()),
+ arrow::field("xdbc_scope_table", arrow::utf8()),
+ arrow::field("xdbc_is_autoincrement", arrow::boolean()),
+ arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
+ });
+ static std::shared_ptr<arrow::DataType> kUsageSchema = arrow::struct_({
+ arrow::field("fk_catalog", arrow::utf8()),
+ arrow::field("fk_db_schema", arrow::utf8()),
+ arrow::field("fk_table", arrow::utf8()),
+ arrow::field("fk_column_name", arrow::utf8()),
+ });
+ static std::shared_ptr<arrow::DataType> kConstraintSchema = arrow::struct_({
+ arrow::field("constraint_name", arrow::utf8()),
+ arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
+ arrow::field("column_names", arrow::list(arrow::utf8()), /*nullable=*/false),
+ arrow::field("column_names", arrow::list(kUsageSchema)),
+ });
+ static std::shared_ptr<arrow::DataType> kTableSchema = arrow::struct_({
+ arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
+ arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+ arrow::field("table_columns", arrow::list(kColumnSchema)),
+ arrow::field("table_constraints", arrow::list(kConstraintSchema)),
+ });
+ static std::shared_ptr<arrow::DataType> kDbSchemaSchema = arrow::struct_({
+ arrow::field("db_schema_name", arrow::utf8()),
+ arrow::field("db_schema_tables", arrow::list(kTableSchema)),
+ });
+ static std::shared_ptr<arrow::Schema> kCatalogSchema = arrow::schema({
+ arrow::field("catalog_name", arrow::utf8()),
+ arrow::field("catalog_db_schemas", arrow::list(kDbSchemaSchema)),
+ });
+
+ static const char kTableQuery[] =
+ R"(SELECT name, type
+ FROM sqlite_master
+ WHERE name LIKE ? AND type <> "index"
+ ORDER BY name ASC)";
+ static const char kColumnQuery[] =
+ R"(SELECT cid, name
+ FROM pragma_table_info(?)
+ WHERE name LIKE ?
+ ORDER BY cid ASC)";
+ static const char kPrimaryKeyQuery[] =
+ R"(SELECT name
+ FROM pragma_table_info(?)
+ WHERE pk > 0
+ ORDER BY pk ASC)";
+ static const char kForeignKeyQuery[] =
+ R"(SELECT id, seq, "table", "from", "to"
+ FROM pragma_foreign_key_list(?)
+ ORDER BY id, seq ASC)";
+
+ arrow::StringBuilder catalog_name;
+ std::unique_ptr<arrow::ArrayBuilder> catalog_schemas_builder;
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ MakeBuilder(arrow::default_memory_pool(), kCatalogSchema->field(1)->type(),
+ &catalog_schemas_builder),
+ error));
+ auto* catalog_schemas =
+ static_cast<arrow::ListBuilder*>(catalog_schemas_builder.get());
+ auto* catalog_schemas_items =
+ static_cast<arrow::StructBuilder*>(catalog_schemas->value_builder());
+ auto* db_schema_name =
+ static_cast<arrow::StringBuilder*>(catalog_schemas_items->child_builder(0).get());
+ auto* db_schema_tables =
+ static_cast<arrow::ListBuilder*>(catalog_schemas_items->child_builder(1).get());
+ auto* db_schema_tables_items =
+ static_cast<arrow::StructBuilder*>(db_schema_tables->value_builder());
+ auto* table_names = static_cast<arrow::StringBuilder*>(
+ db_schema_tables_items->child_builder(0).get());
+ auto* table_types = static_cast<arrow::StringBuilder*>(
+ db_schema_tables_items->child_builder(1).get());
+ auto* table_columns =
+ static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(2).get());
+ auto* table_columns_items =
+ static_cast<arrow::StructBuilder*>(table_columns->value_builder());
+ auto* column_names =
+ static_cast<arrow::StringBuilder*>(table_columns_items->child_builder(0).get());
+ auto* ordinal_positions =
+ static_cast<arrow::Int32Builder*>(table_columns_items->child_builder(1).get());
+ auto* table_constraints =
+ static_cast<arrow::ListBuilder*>(db_schema_tables_items->child_builder(3).get());
+ auto* table_constraints_items =
+ static_cast<arrow::StructBuilder*>(table_constraints->value_builder());
+ auto* constraint_names = static_cast<arrow::StringBuilder*>(
+ table_constraints_items->child_builder(0).get());
+ auto* constraint_types = static_cast<arrow::StringBuilder*>(
+ table_constraints_items->child_builder(1).get());
+ auto* constraint_column_names =
+ static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(2).get());
+ auto* constraint_column_names_items =
+ static_cast<arrow::StringBuilder*>(constraint_column_names->value_builder());
+ auto* constraint_column_usage =
+ static_cast<arrow::ListBuilder*>(table_constraints_items->child_builder(3).get());
+ auto* constraint_column_usage_items =
+ static_cast<arrow::StructBuilder*>(constraint_column_usage->value_builder());
+ auto* constraint_column_usage_fk_catalog = static_cast<arrow::StringBuilder*>(
+ constraint_column_usage_items->child_builder(0).get());
+ auto* constraint_column_usage_fk_db_schema = static_cast<arrow::StringBuilder*>(
+ constraint_column_usage_items->child_builder(1).get());
+ auto* constraint_column_usage_fk_table = static_cast<arrow::StringBuilder*>(
+ constraint_column_usage_items->child_builder(2).get());
+ auto* constraint_column_usage_fk_column_name = static_cast<arrow::StringBuilder*>(
+ constraint_column_usage_items->child_builder(3).get());
+
+ if (!catalog || std::strlen(catalog) == 0) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.AppendNull(), error));
+
+ if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->AppendNull(), error));
+ } else if (!db_schema || std::strlen(db_schema) == 0) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_name->AppendNull(), error));
+ if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->AppendNull(), error));
+ } else {
+ // Look up tables
+
+ std::unordered_set<std::string> table_type_filter;
+ if (table_type) {
+ while (*table_type) {
+ table_type_filter.insert(*table_type);
+ table_type++;
+ }
+ }
+
+ sqlite3* db = connection_->db();
+ ADBC_RETURN_NOT_OK(
+ DoQuery(db, kTableQuery, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ if (table_name) {
+ ADBC_RETURN_NOT_OK(CheckRc(
+ db,
+ sqlite3_bind_text64(stmt, 1, table_name, std::strlen(table_name),
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+ } else {
+ ADBC_RETURN_NOT_OK(CheckRc(
+ db,
+ sqlite3_bind_text64(stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+ }
+
+ int rc = SQLITE_OK;
+ ADBC_RETURN_NOT_OK(FromArrowStatus(db_schema_tables->Append(), error));
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ const char* cur_table =
+ reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
+ const char* cur_table_type =
+ reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
+
+ if (!table_type_filter.empty() &&
+ table_type_filter.find(cur_table_type) == table_type_filter.end()) {
+ continue;
+ }
+
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_names->Append(cur_table), error));
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_types->Append(cur_table_type), error));
+ if (depth == ADBC_OBJECT_DEPTH_TABLES) {
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_columns->AppendNull(), error));
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_constraints->AppendNull(), error));
+ } else {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(table_columns->Append(), error));
+ ADBC_RETURN_NOT_OK(DoQuery(
+ db, kColumnQuery, error,
+ [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db,
+ sqlite3_bind_text64(stmt, 1, cur_table,
+ std::strlen(cur_table),
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+
+ if (column_name) {
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db,
+ sqlite3_bind_text64(stmt, 2, column_name,
+ std::strlen(column_name),
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+ } else {
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db,
+ sqlite3_bind_text64(stmt, 2, "%", 1,
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+ }
+
+ int rc = SQLITE_OK;
+ int64_t row_count = 0;
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ row_count++;
+ const int32_t cur_ordinal_position =
+ 1 + sqlite3_column_int(stmt, 0);
+ const char* cur_column_name = reinterpret_cast<const char*>(
+ sqlite3_column_text(stmt, 1));
+
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ column_names->Append(cur_column_name), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ ordinal_positions->Append(cur_ordinal_position), error));
+
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_columns_items->Append(), error));
+ }
+ for (int i = 2; i < table_columns_items->num_children(); i++) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ table_columns_items->child_builder(i)->AppendNulls(
+ row_count),
+ error));
+ }
+ if (rc != SQLITE_DONE) {
+ return CheckRc(db, rc, "sqlite3_step", error);
+ }
+ return ADBC_STATUS_OK;
+ }));
+
+ // We can get primary key and foreign keys, but not unique (without
+ // parsing the table definition, at least)
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(table_constraints->Append(), error));
+ ADBC_RETURN_NOT_OK(DoQuery(
+ db, kPrimaryKeyQuery, error,
+ [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db,
+ sqlite3_bind_text64(stmt, 1, cur_table,
+ std::strlen(cur_table),
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+
+ int rc = SQLITE_OK;
+ bool has_primary_key = false;
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ if (!has_primary_key) {
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(constraint_names->AppendNull(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_types->Append("PRIMARY KEY"), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_names->Append(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage->Append(), error));
+ }
+ has_primary_key = true;
+ const char* cur_column_name = reinterpret_cast<const char*>(
+ sqlite3_column_text(stmt, 0));
+
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_names_items->Append(cur_column_name),
+ error));
+ }
+ if (has_primary_key) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ table_constraints_items->Append(), error));
+ }
+ if (rc != SQLITE_DONE) {
+ return CheckRc(db, rc, "sqlite3_step", error);
+ }
+ return ADBC_STATUS_OK;
+ }));
+ ADBC_RETURN_NOT_OK(DoQuery(
+ db, kForeignKeyQuery, error,
+ [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ ADBC_RETURN_NOT_OK(
+ CheckRc(db,
+ sqlite3_bind_text64(stmt, 1, cur_table,
+ std::strlen(cur_table),
+ SQLITE_STATIC, SQLITE_UTF8),
+ "sqlite3_bind_text64", error));
+
+ int rc = SQLITE_OK;
+ int prev_key_id = -1;
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ const int key_id = sqlite3_column_int(stmt, 0);
+ const int key_seq = sqlite3_column_int(stmt, 1);
+ const char* to_table = reinterpret_cast<const char*>(
+ sqlite3_column_text(stmt, 2));
+ const char* from_col = reinterpret_cast<const char*>(
+ sqlite3_column_text(stmt, 3));
+ const char* to_col = reinterpret_cast<const char*>(
+ sqlite3_column_text(stmt, 4));
+ if (key_id != prev_key_id) {
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(constraint_names->AppendNull(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_types->Append("FOREIGN KEY"), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_names->Append(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage->Append(), error));
+ if (prev_key_id != -1) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ table_constraints_items->Append(), error));
+ }
+ }
+ prev_key_id = key_id;
+
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_names_items->Append(from_col), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage_fk_catalog->AppendNull(), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage_fk_db_schema->AppendNull(),
+ error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage_fk_table->Append(to_table),
+ error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage_fk_column_name->Append(to_col),
+ error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ constraint_column_usage_items->Append(), error));
+ }
+ if (prev_key_id != -1) {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(
+ table_constraints_items->Append(), error));
+ }
+ if (rc != SQLITE_DONE) {
+ return CheckRc(db, rc, "sqlite3_step", error);
+ }
+ return ADBC_STATUS_OK;
+ }));
+ }
+
+ ADBC_RETURN_NOT_OK(
+ FromArrowStatus(db_schema_tables_items->Append(), error));
+ }
+ if (rc != SQLITE_DONE) {
+ return CheckRc(db, rc, "sqlite3_step", error);
+ }
+ return ADBC_STATUS_OK;
+ }));
+ }
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas_items->Append(), error));
+ } else {
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Append(), error));
+ }
+ }
+
+ arrow::ArrayVector arrays(2);
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_name.Finish(&arrays[0]), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(catalog_schemas->Finish(&arrays[1]), error));
+ const int64_t rows = arrays[0]->length();
+ auto status =
+ arrow::RecordBatchReader::Make(
+ {
+ arrow::RecordBatch::Make(kCatalogSchema, rows, std::move(arrays)),
+ },
+ kCatalogSchema)
+ .Value(&result_reader_);
+ ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+ return ADBC_STATUS_OK;
+ }
+
+ AdbcStatusCode GetTableTypes(const std::shared_ptr<SqliteStatementImpl>& self,
+ struct AdbcError* error) {
+ auto schema =
+ arrow::schema({arrow::field("table_type", arrow::utf8(), /*nullable=*/false)});
+
+ arrow::StringBuilder builder;
+ std::shared_ptr<arrow::Array> array;
+ ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("table"), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Append("view"), error));
+ ADBC_RETURN_NOT_OK(FromArrowStatus(builder.Finish(&array), error));
+
+ auto status =
+ arrow::RecordBatchReader::Make(
+ {
+ arrow::RecordBatch::Make(schema, /*num_rows=*/2, {std::move(array)}),
+ },
+ schema)
+ .Value(&result_reader_);
+ ADBC_RETURN_NOT_OK(FromArrowStatus(status, error));
+ return ADBC_STATUS_OK;
}
AdbcStatusCode GetStream(const std::shared_ptr<SqliteStatementImpl>& self,
struct ArrowArrayStream* out, struct AdbcError* error) {
- if (!stmt_ || !schema_) {
+ if (!result_reader_) {
SetError(error, "Statement has not yet been executed");
return ADBC_STATUS_UNINITIALIZED;
}
- auto status = arrow::ExportRecordBatchReader(self, out);
+ auto status = arrow::ExportRecordBatchReader(result_reader_, out);
if (!status.ok()) {
SetError(error, "Could not initialize result reader: ", status);
- return ADBC_STATUS_UNKNOWN;
+ return ADBC_STATUS_INTERNAL;
}
+ result_reader_.reset();
return ADBC_STATUS_OK;
}
@@ -393,13 +936,12 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
const char* key, const char* value, struct AdbcError* error) {
if (std::strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
// Bulk ingest
+
+ // Clear previous statement, if any
+ ADBC_RETURN_NOT_OK(Close(error));
+
if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
bulk_table_ = value;
- if (stmt_) {
- int rc = sqlite3_finalize(stmt_);
- ADBC_RETURN_NOT_OK(
- CheckRc(connection_->db(), nullptr, rc, "sqlite3_finalize", error));
- }
return ADBC_STATUS_OK;
}
SetError(error, "Unknown option: ", key);
@@ -409,6 +951,9 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
AdbcStatusCode SetSqlQuery(const std::shared_ptr<SqliteStatementImpl>& self,
const char* query, struct AdbcError* error) {
bulk_table_.clear();
+ // Clear previous statement, if any
+ ADBC_RETURN_NOT_OK(Close(error));
+
sqlite3* db = connection_->db();
int rc = sqlite3_prepare_v2(db, query, static_cast<int>(std::strlen(query)), &stmt_,
/*pzTail=*/nullptr);
@@ -416,48 +961,6 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
}
private:
- arrow::Result<int> BindNext() {
- if (!next_parameters_ || bind_index_ >= next_parameters_->num_rows()) {
- return SQLITE_OK;
- }
-
- return BindImpl(stmt_, *next_parameters_, bind_index_++);
- }
-
- arrow::Result<int> BindImpl(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
- int64_t row) {
- int col_index = 1;
- for (const auto& column : data.columns()) {
- if (column->IsNull(row)) {
- const int rc = sqlite3_bind_null(stmt, col_index);
- if (rc != SQLITE_OK) return rc;
- } else {
- switch (column->type()->id()) {
- case arrow::Type::INT64: {
- const int rc = sqlite3_bind_int64(
- stmt, col_index,
- static_cast<const arrow::Int64Array&>(*column).Value(row));
- if (rc != SQLITE_OK) return rc;
- break;
- }
- case arrow::Type::STRING: {
- const auto& strings = static_cast<const arrow::StringArray&>(*column);
- const int rc = sqlite3_bind_text64(stmt, col_index, strings.Value(row).data(),
- strings.value_length(row), SQLITE_STATIC,
- SQLITE_UTF8);
- if (rc != SQLITE_OK) return rc;
- break;
- }
- default:
- return arrow::Status::NotImplemented("Binding parameter of type ",
- *column->type());
- }
- }
- col_index++;
- }
- return SQLITE_OK;
- }
-
AdbcStatusCode ExecuteBulk(struct AdbcError* error) {
if (!bind_parameters_) {
SetError(error, "Must AdbcStatementBind for bulk insertion");
@@ -531,8 +1034,8 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
if (!batch) break;
for (int64_t row = 0; row < batch->num_rows(); row++) {
- status = BindImpl(stmt, *batch, row).Value(&rc);
- ADBC_RETURN_NOT_OK(check_status(status));
+ // TODO: if this fails we won't release the statement
+ ADBC_RETURN_NOT_OK(BindParameters(stmt, *batch, row, &rc, error));
ADBC_RETURN_NOT_OK(CheckRc(db, stmt, rc, "sqlite3_bind", error));
rc = sqlite3_step(stmt);
@@ -554,51 +1057,31 @@ class SqliteStatementImpl : public arrow::RecordBatchReader {
AdbcStatusCode ExecutePrepared(struct AdbcError* error) {
sqlite3* db = connection_->db();
- int rc = SQLITE_OK;
- if (schema_) {
- rc = sqlite3_clear_bindings(stmt_);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
-
- rc = sqlite3_reset(stmt_);
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
- }
- // Step the statement and get the schema (SQLite doesn't
- // necessarily know the schema until it begins to execute it)
-
- Status status;
- if (bind_parameters_) {
- status = bind_parameters_->ReadNext(&next_parameters_);
- if (status.ok()) status = BindNext().Value(&rc);
- }
- // XXX: with parameters, inferring the schema from the first
- // argument is inaccurate (what if one is null?). Is there a way
- // to hint to SQLite the real type?
-
- if (!status.ok()) {
- // TODO: map Arrow codes to ADBC codes
- SetError(error, status);
- return ADBC_STATUS_IO;
- }
- ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_bind", error));
-
- rc = sqlite3_step(stmt_);
- if (rc == SQLITE_ERROR) {
- return CheckRc(db, stmt_, rc, "sqlite3_error", error);
- }
- schema_ = StatementToSchema(stmt_);
- done_ = rc != SQLITE_ROW;
+ int rc = sqlite3_clear_bindings(stmt_);
+ ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_clear_bindings", error));
+
+ rc = sqlite3_reset(stmt_);
+ ADBC_RETURN_NOT_OK(CheckRc(db, stmt_, rc, "sqlite3_reset", error));
+ auto reader = std::make_shared<SqliteStatementReader>(connection_, stmt_,
+ std::move(bind_parameters_));
+ ADBC_RETURN_NOT_OK(reader->Init(error));
+ result_reader_ = std::move(reader);
return ADBC_STATUS_OK;
}
std::shared_ptr<SqliteConnectionImpl> connection_;
+
+ // Query state
+
+ // Bulk ingestion
// Target of bulk ingestion (rather janky to store state like this, though…)
std::string bulk_table_;
+
+ // Prepared statements
sqlite3_stmt* stmt_;
- std::shared_ptr<arrow::Schema> schema_;
std::shared_ptr<arrow::RecordBatchReader> bind_parameters_;
- std::shared_ptr<arrow::RecordBatch> next_parameters_;
- int64_t bind_index_;
- bool done_;
+
+ std::shared_ptr<arrow::RecordBatchReader> result_reader_;
};
// ADBC interface implementation - as private functions so that these
@@ -649,20 +1132,35 @@ AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
return status;
}
-AdbcStatusCode SqliteConnectionNew(struct AdbcDatabase* database,
- struct AdbcConnection* connection,
- struct AdbcError* error) {
+AdbcStatusCode SqliteConnectionGetObjects(
+ struct AdbcConnection* connection, int depth, const char* catalog,
+ const char* db_schema, const char* table_name, const char** table_types,
+ const char* column_name, struct AdbcStatement* statement, struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_UNINITIALIZED;
auto ptr =
- reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
- auto impl = std::make_shared<SqliteConnectionImpl>(*ptr);
- connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
- return ADBC_STATUS_OK;
+ reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
+ return (*ptr)->GetObjects(*ptr, depth, catalog, db_schema, table_name, table_types,
+ column_name, error);
}
-AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
- const char* key, const char* value,
- struct AdbcError* error) {
- return ADBC_STATUS_OK;
+AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
+ const char* catalog, const char* db_schema,
+ const char* table_name,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_UNINITIALIZED;
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+ return (*ptr)->GetTableSchema(catalog, db_schema, table_name, schema, error);
+}
+
+AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
+ struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ if (!statement->private_data) return ADBC_STATUS_UNINITIALIZED;
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteStatementImpl>*>(statement->private_data);
+ return (*ptr)->GetTableTypes(*ptr, error);
}
AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
@@ -673,6 +1171,16 @@ AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
return (*ptr)->Init(error);
}
+AdbcStatusCode SqliteConnectionNew(struct AdbcDatabase* database,
+ struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteDatabaseImpl>*>(database->private_data);
+ auto impl = std::make_shared<SqliteConnectionImpl>(*ptr);
+ connection->private_data = new std::shared_ptr<SqliteConnectionImpl>(impl);
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_data) return ADBC_STATUS_UNINITIALIZED;
@@ -684,6 +1192,12 @@ AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
return status;
}
+AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
+ const char* key, const char* value,
+ struct AdbcError* error) {
+ return ADBC_STATUS_OK;
+}
+
AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -795,15 +1309,29 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
return SqliteDatabaseRelease(database, error);
}
-AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
- struct AdbcConnection* connection,
- struct AdbcError* error) {
- return SqliteConnectionNew(database, connection, error);
+AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
+ const char* catalog, const char* db_schema,
+ const char* table_name, const char** table_types,
+ const char* column_name,
+ struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
+ table_types, column_name, statement, error);
}
-AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
- const char* value, struct AdbcError* error) {
- return SqliteConnectionSetOption(connection, key, value, error);
+AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
+ const char* catalog, const char* db_schema,
+ const char* table_name,
+ struct ArrowSchema* schema,
+ struct AdbcError* error) {
+ return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
+ schema, error);
+}
+
+AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
+ struct AdbcStatement* statement,
+ struct AdbcError* error) {
+ return SqliteConnectionGetTableTypes(connection, statement, error);
}
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
@@ -811,11 +1339,22 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
return SqliteConnectionInit(connection, error);
}
+AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
+ struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionNew(database, connection, error);
+}
+
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionRelease(connection, error);
}
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+ const char* value, struct AdbcError* error) {
+ return SqliteConnectionSetOption(connection, key, value, error);
+}
+
AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -889,6 +1428,9 @@ AdbcStatusCode AdbcSqliteDriverInit(size_t count, struct AdbcDriver* driver,
driver->DatabaseRelease = SqliteDatabaseRelease;
driver->DatabaseSetOption = SqliteDatabaseSetOption;
+ driver->ConnectionGetObjects = SqliteConnectionGetObjects;
+ driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
+ driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
driver->ConnectionInit = SqliteConnectionInit;
driver->ConnectionNew = SqliteConnectionNew;
driver->ConnectionRelease = SqliteConnectionRelease;
diff --git a/drivers/sqlite/sqlite_test.cc b/drivers/sqlite/sqlite_test.cc
index b562f6a..dff4a6e 100644
--- a/drivers/sqlite/sqlite_test.cc
+++ b/drivers/sqlite/sqlite_test.cc
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+#include <string>
+#include <vector>
+
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -32,6 +35,18 @@ namespace adbc {
using arrow::PointeesEqual;
+using RecordBatchMatcher =
+ decltype(::testing::UnorderedPointwise(PointeesEqual(), arrow::RecordBatchVector{}));
+
+RecordBatchMatcher BatchesAre(const std::shared_ptr<arrow::Schema>& schema,
+ const std::vector<std::string>& batch_json) {
+ arrow::RecordBatchVector batches;
+ for (const std::string& json : batch_json) {
+ batches.push_back(adbc::RecordBatchFromJSON(schema, json));
+ }
+ return ::testing::UnorderedPointwise(PointeesEqual(), std::move(batches));
+}
+
class Sqlite : public ::testing::Test {
public:
void SetUp() override {
@@ -58,6 +73,54 @@ class Sqlite : public ::testing::Test {
AdbcDatabase database;
AdbcConnection connection;
AdbcError error = {};
+
+ std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
+ arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
+ arrow::field("ordinal_position", arrow::int32()),
+ arrow::field("remarks", arrow::utf8()),
+ arrow::field("xdbc_data_type", arrow::int16()),
+ arrow::field("xdbc_type_name", arrow::utf8()),
+ arrow::field("xdbc_column_size", arrow::int32()),
+ arrow::field("xdbc_decimal_digits", arrow::int16()),
+ arrow::field("xdbc_num_prec_radix", arrow::int16()),
+ arrow::field("xdbc_nullable", arrow::int16()),
+ arrow::field("xdbc_column_def", arrow::utf8()),
+ arrow::field("xdbc_sql_data_type", arrow::int16()),
+ arrow::field("xdbc_datetime_sub", arrow::int16()),
+ arrow::field("xdbc_char_octet_length", arrow::int32()),
+ arrow::field("xdbc_is_nullable", arrow::utf8()),
+ arrow::field("xdbc_scope_catalog", arrow::utf8()),
+ arrow::field("xdbc_scope_schema", arrow::utf8()),
+ arrow::field("xdbc_scope_table", arrow::utf8()),
+ arrow::field("xdbc_is_autoincrement", arrow::boolean()),
+ arrow::field("xdbc_is_generatedcolumn", arrow::boolean()),
+ });
+ std::shared_ptr<arrow::DataType> usage_schema = arrow::struct_({
+ arrow::field("fk_catalog", arrow::utf8()),
+ arrow::field("fk_db_schema", arrow::utf8()),
+ arrow::field("fk_table", arrow::utf8()),
+ arrow::field("fk_column_name", arrow::utf8()),
+ });
+ std::shared_ptr<arrow::DataType> constraint_schema = arrow::struct_({
+ arrow::field("constraint_name", arrow::utf8()),
+ arrow::field("constraint_type", arrow::utf8(), /*nullable=*/false),
+ arrow::field("column_names", arrow::list(arrow::utf8()), /*nullable=*/false),
+ arrow::field("column_names", arrow::list(usage_schema)),
+ });
+ std::shared_ptr<arrow::DataType> table_schema = arrow::struct_({
+ arrow::field("table_name", arrow::utf8(), /*nullable=*/false),
+ arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+ arrow::field("table_columns", arrow::list(column_schema)),
+ arrow::field("table_constraints", arrow::list(constraint_schema)),
+ });
+ std::shared_ptr<arrow::DataType> db_schema_schema = arrow::struct_({
+ arrow::field("db_schema_name", arrow::utf8()),
+ arrow::field("db_schema_tables", arrow::list(table_schema)),
+ });
+ std::shared_ptr<arrow::Schema> catalog_schema = arrow::schema({
+ arrow::field("catalog_name", arrow::utf8()),
+ arrow::field("catalog_db_schemas", arrow::list(db_schema_schema)),
+ });
};
TEST_F(Sqlite, SqlExecute) {
@@ -256,9 +319,7 @@ TEST_F(Sqlite, MultipleConnections) {
arrow::RecordBatchVector batches;
ASSERT_NO_FATAL_FAILURE(ReadStatement(&statement, &schema, &batches));
ASSERT_SCHEMA_EQ(*schema, *arrow::schema({}));
- EXPECT_THAT(batches,
- ::testing::UnorderedPointwise(
- PointeesEqual(), std::vector<std::shared_ptr<arrow::RecordBatch>>{}));
+ EXPECT_TRUE(batches.empty());
}
{
@@ -274,12 +335,334 @@ TEST_F(Sqlite, MultipleConnections) {
arrow::RecordBatchVector batches;
ReadStatement(&statement, &schema, &batches);
ASSERT_SCHEMA_EQ(*schema, *arrow::schema({arrow::field("bar", arrow::null())}));
- EXPECT_THAT(batches,
- ::testing::UnorderedPointwise(
- PointeesEqual(), std::vector<std::shared_ptr<arrow::RecordBatch>>{}));
+ EXPECT_TRUE(batches.empty());
}
ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2, &error));
}
+TEST_F(Sqlite, MetadataGetTableTypes) {
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error,
+ AdbcConnectionGetTableTypes(&connection, &statement, &error));
+
+ auto expected_schema = arrow::schema({
+ arrow::field("table_type", arrow::utf8(), /*nullable=*/false),
+ });
+ std::shared_ptr<arrow::Schema> schema;
+ arrow::RecordBatchVector batches;
+ ReadStatement(&statement, &schema, &batches);
+ ASSERT_SCHEMA_EQ(*schema, *expected_schema);
+ EXPECT_THAT(batches,
+ ::testing::UnorderedPointwise(
+ PointeesEqual(), {
+ adbc::RecordBatchFromJSON(
+ expected_schema, R"([["table"], ["view"]])"),
+ }));
+}
+
+TEST_F(Sqlite, MetadataGetObjects) {
+ // Create a table via ingestion
+ {
+ ArrowArray export_table;
+ ArrowSchema export_schema;
+ auto bulk_schema = arrow::schema(
+ {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+ auto bulk_table =
+ adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_insert", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+ }
+
+ // Query for catalogs
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ std::shared_ptr<arrow::Schema> schema;
+ arrow::RecordBatchVector batches;
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr,
+ nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([[null, null]])"}));
+ batches.clear();
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, "catalog",
+ nullptr, nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([])"}));
+ batches.clear();
+
+ // Query for schemas
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr,
+ nullptr, nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(
+ batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": null}]]])"}));
+ batches.clear();
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr,
+ "schema", nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches, BatchesAre(catalog_schema, {R"([[null, []]])"}));
+ batches.clear();
+
+ // Query for tables
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+ nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {"table_name": "bulk_insert", "table_type": "table", "table_columns": null, "table_constraints": null}
+]}]]])"}));
+ batches.clear();
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+ "bulk_%", nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {"table_name": "bulk_insert", "table_type": "table", "table_columns": null, "table_constraints": null}
+]}]]])"}));
+ batches.clear();
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr,
+ "asdf%", nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(
+ batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": []}]]])"}));
+ batches.clear();
+
+ // Query for table types
+ std::vector<const char*> table_types(2);
+ table_types[0] = "table";
+ table_types[1] = nullptr;
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+ nullptr, table_types.data(), nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {
+ "table_name": "bulk_insert",
+ "table_type": "table",
+ "table_columns": [
+ ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["strs", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": []
+ }
+]}]]])"}));
+ batches.clear();
+
+ table_types[0] = "view";
+ table_types[1] = nullptr;
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+ nullptr, table_types.data(), nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(
+ batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": []}]]])"}));
+ batches.clear();
+
+ // Query for columns
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+ nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {
+ "table_name": "bulk_insert",
+ "table_type": "table",
+ "table_columns": [
+ ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["strs", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": []
+ }
+]}]]])"}));
+ batches.clear();
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+ nullptr, nullptr, "in%", &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {
+ "table_name": "bulk_insert",
+ "table_type": "table",
+ "table_columns": [
+ ["ints", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": []
+ }
+]}]]])"}));
+ batches.clear();
+}
+
+TEST_F(Sqlite, MetadataGetObjectsColumns) {
+ {
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcStatementSetSqlQuery(
+ &statement, "CREATE TABLE parent (a, b, c, PRIMARY KEY(c, b))", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetSqlQuery(&statement, "CREATE TABLE other (a)", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetSqlQuery(
+ &statement,
+ "CREATE TABLE child (a, b, c, PRIMARY KEY(a), FOREIGN KEY (c, b) "
+ "REFERENCES parent (c, b), FOREIGN KEY (a) REFERENCES other(a))",
+ &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+ }
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ std::shared_ptr<arrow::Schema> schema;
+ arrow::RecordBatchVector batches;
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
+ nullptr, nullptr, nullptr, &statement, &error));
+ ReadStatement(&statement, &schema, &batches);
+ EXPECT_THAT(batches,
+ BatchesAre(catalog_schema,
+ {R"([[null, [{"db_schema_name": null, "db_schema_tables": [
+ {
+ "table_name": "child",
+ "table_type": "table",
+ "table_columns": [
+ ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": [
+ [null, "PRIMARY KEY", ["a"], []],
+ [null, "FOREIGN KEY", ["a"], [[null, null, "other", "a"]]],
+ [null, "FOREIGN KEY", ["c", "b"], [[null, null, "parent", "c"], [null, null, "parent", "b"]]]
+ ]
+ },
+ {
+ "table_name": "other",
+ "table_type": "table",
+ "table_columns": [
+ ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": []
+ },
+ {
+ "table_name": "parent",
+ "table_type": "table",
+ "table_columns": [
+ ["a", 1, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["b", 2, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null],
+ ["c", 3, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null]
+ ],
+ "table_constraints": [
+ [null, "PRIMARY KEY", ["c", "b"], []]
+ ]
+ }
+]}]]])"}));
+ batches.clear();
+}
+
+TEST_F(Sqlite, MetadataGetTableSchema) {
+ // Create a table via ingestion
+ auto bulk_schema = arrow::schema(
+ {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+ {
+ ArrowArray export_table;
+ ArrowSchema export_schema;
+ auto bulk_table =
+ adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_insert", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+ }
+
+ ArrowSchema export_schema;
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcConnectionGetTableSchema(&connection, /*catalog=*/nullptr,
+ /*db_schema=*/nullptr, "bulk_insert",
+ &export_schema, &error));
+
+ ASSERT_OK_AND_ASSIGN(auto schema, arrow::ImportSchema(&export_schema));
+ ASSERT_SCHEMA_EQ(*schema, *bulk_schema);
+}
+
} // namespace adbc