You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/24 17:32:37 UTC
[arrow-adbc] branch main updated: Add basic transaction semantics (#24)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new b1b62fd Add basic transaction semantics (#24)
b1b62fd is described below
commit b1b62fd2d9bd9252426230bf60dfac547d262153
Author: David Li <li...@gmail.com>
AuthorDate: Fri Jun 24 13:32:32 2022 -0400
Add basic transaction semantics (#24)
Fixes #23.
---
adbc.h | 97 +++++++++-----
adbc_driver_manager/adbc_driver_manager.cc | 54 +++++++-
adbc_driver_manager/adbc_driver_manager_test.cc | 11 ++
drivers/sqlite/sqlite.cc | 163 ++++++++++++++++++++----
drivers/sqlite/sqlite_test.cc | 159 ++++++++++++++++-------
5 files changed, 375 insertions(+), 109 deletions(-)
diff --git a/adbc.h b/adbc.h
index 9e1ed97..f21415f 100644
--- a/adbc.h
+++ b/adbc.h
@@ -69,8 +69,6 @@ struct ArrowArray {
#endif // ARROW_C_DATA_INTERFACE
-// EXPERIMENTAL: C stream interface
-
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
@@ -252,6 +250,11 @@ struct ADBC_EXPORT AdbcError {
/// }@
+/// \brief Canonical option value for enabling an option.
+#define ADBC_OPTION_VALUE_ENABLED "true"
+/// \brief Canonical option value for disabling an option.
+#define ADBC_OPTION_VALUE_DISABLED "false"
+
/// \defgroup adbc-database Database initialization.
/// Clients first initialize a database, then connect to the database
/// (below). For client-server databases, one of these steps may be a
@@ -340,32 +343,6 @@ ADBC_EXPORT
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error);
-/// \defgroup adbc-connection-partition Partitioned Results
-/// Some databases may internally partition the results. These
-/// partitions are exposed to clients who may wish to integrate them
-/// with a threaded or distributed execution model, where partitions
-/// can be divided among threads or machines for processing.
-///
-/// Drivers are not required to support partitioning.
-///
-/// Partitions are not ordered. If the result set is sorted,
-/// implementations should return a single partition.
-///
-/// @{
-
-/// \brief Construct a statement for a partition of a query. The
-/// statement can then be read independently.
-///
-/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
-ADBC_EXPORT
-AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
- const uint8_t* serialized_partition,
- size_t serialized_length,
- struct AdbcStatement* statement,
- struct AdbcError* error);
-
-/// }@
-
/// \defgroup adbc-connection-metadata Metadata
/// Functions for retrieving metadata about the database.
///
@@ -530,6 +507,64 @@ AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
/// }@
+/// \defgroup adbc-connection-partition Partitioned Results
+/// Some databases may internally partition the results. These
+/// partitions are exposed to clients who may wish to integrate them
+/// with a threaded or distributed execution model, where partitions
+/// can be divided among threads or machines for processing.
+///
+/// Drivers are not required to support partitioning.
+///
+/// Partitions are not ordered. If the result set is sorted,
+/// implementations should return a single partition.
+///
+/// @{
+
+/// \brief Construct a statement for a partition of a query. The
+/// statement can then be read independently.
+///
+/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
+ const uint8_t* serialized_partition,
+ size_t serialized_length,
+ struct AdbcStatement* statement,
+ struct AdbcError* error);
+
+/// }@
+
+/// \defgroup adbc-connection-transaction Transaction Semantics
+///
+/// Connections start out in auto-commit mode by default (if
+/// applicable for the given vendor). Use
+/// ADBC_CONNECTION_OPTION_AUTO_COMMIT to change this.
+///
+/// @{
+
+/// \brief The name of the canonical option for whether autocommit is
+/// enabled.
+#define ADBC_CONNECTION_OPTION_AUTOCOMMIT "adbc.connection.autocommit"
+
+/// \brief Commit any pending transactions. Only used if autocommit is
+/// disabled.
+///
+/// Behavior is undefined if this is mixed with SQL transaction
+/// statements.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error);
+
+/// \brief Roll back any pending transactions. Only used if autocommit
+/// is disabled.
+///
+/// Behavior is undefined if this is mixed with SQL transaction
+/// statements.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error);
+
+/// }@
+
/// }@
/// \defgroup adbc-statement Managing statements.
@@ -784,11 +819,15 @@ struct ADBC_EXPORT AdbcDriver {
struct AdbcError*);
AdbcStatusCode (*ConnectionInit)(struct AdbcConnection*, struct AdbcError*);
AdbcStatusCode (*ConnectionRelease)(struct AdbcConnection*, struct AdbcError*);
+
AdbcStatusCode (*ConnectionDeserializePartitionDesc)(struct AdbcConnection*,
const uint8_t*, size_t,
struct AdbcStatement*,
struct AdbcError*);
+ AdbcStatusCode (*ConnectionCommit)(struct AdbcConnection*, struct AdbcError*);
+ AdbcStatusCode (*ConnectionRollback)(struct AdbcConnection*, struct AdbcError*);
+
AdbcStatusCode (*ConnectionGetObjects)(struct AdbcConnection*, int, const char*,
const char*, const char*, const char**,
const char*, struct AdbcStatement*,
@@ -842,7 +881,7 @@ typedef AdbcStatusCode (*AdbcDriverInitFunc)(size_t count, struct AdbcDriver* dr
// struct/entrypoint instead?
// For use with count
-#define ADBC_VERSION_0_0_1 28
+#define ADBC_VERSION_0_0_1 26
/// }@
diff --git a/adbc_driver_manager/adbc_driver_manager.cc b/adbc_driver_manager/adbc_driver_manager.cc
index e120f59..5fc91f6 100644
--- a/adbc_driver_manager/adbc_driver_manager.cc
+++ b/adbc_driver_manager/adbc_driver_manager.cc
@@ -61,6 +61,11 @@ void SetError(struct AdbcError* error, const std::string& message) {
}
// Default stubs
+
+AdbcStatusCode ConnectionCommit(struct AdbcConnection*, struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection*, struct AdbcStatement*,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -78,6 +83,10 @@ AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, con
return ADBC_STATUS_NOT_IMPLEMENTED;
}
+AdbcStatusCode ConnectionRollback(struct AdbcConnection*, struct AdbcError* error) {
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
AdbcStatusCode StatementBind(struct AdbcStatement*, struct ArrowArray*,
struct ArrowSchema*, struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -274,15 +283,12 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
return status;
}
-AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
- struct AdbcConnection* connection,
- struct AdbcError* error) {
- if (!database->private_driver) {
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!connection->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
- auto status = database->private_driver->ConnectionNew(database, connection, error);
- connection->private_driver = database->private_driver;
- return status;
+ return connection->private_driver->ConnectionCommit(connection, error);
}
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
@@ -293,6 +299,17 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
return connection->private_driver->ConnectionInit(connection, error);
}
+AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
+ struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!database->private_driver) {
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ auto status = database->private_driver->ConnectionNew(database, connection, error);
+ connection->private_driver = database->private_driver;
+ return status;
+}
+
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_driver) {
@@ -303,6 +320,22 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
return status;
}
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!connection->private_driver) {
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ return connection->private_driver->ConnectionRollback(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+ const char* value, struct AdbcError* error) {
+ if (!connection->private_driver) {
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ return connection->private_driver->ConnectionSetOption(connection, key, value, error);
+}
+
AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
@@ -543,10 +576,17 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
CHECK_REQUIRED(driver, DatabaseInit);
CHECK_REQUIRED(driver, DatabaseRelease);
+ CHECK_REQUIRED(driver, ConnectionNew);
+ CHECK_REQUIRED(driver, ConnectionInit);
+ CHECK_REQUIRED(driver, ConnectionRelease);
+ FILL_DEFAULT(driver, ConnectionCommit);
FILL_DEFAULT(driver, ConnectionGetObjects);
FILL_DEFAULT(driver, ConnectionGetTableSchema);
FILL_DEFAULT(driver, ConnectionGetTableTypes);
+ FILL_DEFAULT(driver, ConnectionRollback);
+ CHECK_REQUIRED(driver, StatementNew);
+ CHECK_REQUIRED(driver, StatementRelease);
FILL_DEFAULT(driver, StatementBind);
FILL_DEFAULT(driver, StatementExecute);
FILL_DEFAULT(driver, StatementPrepare);
diff --git a/adbc_driver_manager/adbc_driver_manager_test.cc b/adbc_driver_manager/adbc_driver_manager_test.cc
index cdfc4e2..0c5781f 100644
--- a/adbc_driver_manager/adbc_driver_manager_test.cc
+++ b/adbc_driver_manager/adbc_driver_manager_test.cc
@@ -217,4 +217,15 @@ TEST_F(DriverManager, BulkIngestStream) {
}
}
+TEST_F(DriverManager, Transactions) {
+ // Invalid option value
+ ASSERT_NE(ADBC_STATUS_OK,
+ AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ "invalid", &error));
+
+ // Can't commit/rollback without disabling autocommit
+ ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection, &error));
+ ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection, &error));
+}
+
} // namespace adbc
diff --git a/drivers/sqlite/sqlite.cc b/drivers/sqlite/sqlite.cc
index c11ec23..ca8ca0d 100644
--- a/drivers/sqlite/sqlite.cc
+++ b/drivers/sqlite/sqlite.cc
@@ -156,10 +156,25 @@ class SqliteDatabaseImpl {
public:
explicit SqliteDatabaseImpl() : db_(nullptr), connection_count_(0) {}
- sqlite3* Connect() {
+ AdbcStatusCode Connect(sqlite3** db, struct AdbcError* error) {
std::lock_guard<std::mutex> guard(mutex_);
- if (db_) ++connection_count_;
- return db_;
+ if (!db_) {
+ SetError(error, "Database not yet initialized, call AdbcDatabaseInit");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ // Create a new connection
+ if (database_uri_ == ":memory:") {
+ // unless the special ":memory:" filename is used
+ *db = db_;
+ } else {
+ int rc =
+ sqlite3_open_v2(database_uri_.c_str(), db,
+ SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+ /*zVfs=*/nullptr);
+ ADBC_RETURN_NOT_OK(CheckRc(*db, nullptr, rc, "sqlite3_open_v2", error));
+ }
+ ++connection_count_;
+ return ADBC_STATUS_OK;
}
AdbcStatusCode Init(struct AdbcError* error) {
@@ -167,11 +182,14 @@ class SqliteDatabaseImpl {
SetError(error, "Database already initialized");
return ADBC_STATUS_INVALID_STATE;
}
- const char* filename = ":memory:";
+ database_uri_ = "file:adbc_sqlite_driver?mode=memory&cache=shared";
auto it = options_.find("filename");
- if (it != options_.end()) filename = it->second.c_str();
+ if (it != options_.end()) {
+ database_uri_ = it->second;
+ }
- int rc = sqlite3_open_v2(filename, &db_, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
+ int rc = sqlite3_open_v2(database_uri_.c_str(), &db_,
+ SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
/*zVfs=*/nullptr);
ADBC_RETURN_NOT_OK(CheckRc(db_, nullptr, rc, "sqlite3_open_v2", error));
options_.clear();
@@ -187,12 +205,19 @@ class SqliteDatabaseImpl {
return ADBC_STATUS_OK;
}
- AdbcStatusCode Disconnect(struct AdbcError* error) {
+ AdbcStatusCode Disconnect(sqlite3* db, struct AdbcError* error) {
std::lock_guard<std::mutex> guard(mutex_);
if (--connection_count_ < 0) {
SetError(error, "Connection count underflow");
return ADBC_STATUS_INVALID_STATE;
}
+ // Close the database unless :memory:
+ if (database_uri_ != ":memory:") {
+ if (sqlite3_close(db) != SQLITE_OK) {
+ if (db) SetError(db, "sqlite3_close", error);
+ return ADBC_STATUS_IO;
+ }
+ }
return ADBC_STATUS_OK;
}
@@ -216,6 +241,7 @@ class SqliteDatabaseImpl {
private:
sqlite3* db_;
int connection_count_;
+ std::string database_uri_;
std::unordered_map<std::string, std::string> options_;
std::mutex mutex_;
};
@@ -223,7 +249,7 @@ class SqliteDatabaseImpl {
class SqliteConnectionImpl {
public:
explicit SqliteConnectionImpl(std::shared_ptr<SqliteDatabaseImpl> database)
- : database_(std::move(database)), db_(nullptr) {}
+ : database_(std::move(database)), db_(nullptr), autocommit_(true) {}
sqlite3* db() const { return db_; }
@@ -246,34 +272,75 @@ class SqliteConnectionImpl {
}
query += escaped;
- sqlite3_stmt* stmt;
- int rc = sqlite3_prepare_v2(db_, query.c_str(), static_cast<int>(query.size()), &stmt,
- /*pzTail=*/nullptr);
- ADBC_RETURN_NOT_OK(CheckRc(db_, stmt, rc, "sqlite3_prepare_v2", error));
- rc = sqlite3_step(stmt);
- if (rc == SQLITE_ERROR) {
- return CheckRc(db_, stmt, rc, "sqlite3_step", error);
- }
- auto arrow_schema = StatementToSchema(stmt);
+ std::shared_ptr<arrow::Schema> arrow_schema;
ADBC_RETURN_NOT_OK(
- CheckRc(db_, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error));
+ DoQuery(db_, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ if (sqlite3_step(stmt) == SQLITE_ERROR) {
+ SetError(db_, "sqlite3_step", error);
+ return ADBC_STATUS_IO;
+ }
+ arrow_schema = StatementToSchema(stmt);
+ return ADBC_STATUS_OK;
+ }));
return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
}
- AdbcStatusCode Init(struct AdbcError* error) {
- db_ = database_->Connect();
- if (!db_) {
- SetError(error, "Database not yet initialized!");
+ AdbcStatusCode Init(struct AdbcError* error) { return database_->Connect(&db_, error); }
+
+ AdbcStatusCode Release(struct AdbcError* error) {
+ return database_->Disconnect(db_, error);
+ }
+
+ AdbcStatusCode SetAutocommit(bool autocommit, struct AdbcError* error) {
+ if (autocommit == autocommit_) return ADBC_STATUS_OK;
+ autocommit_ = autocommit;
+
+ const char* query = autocommit_ ? "COMMIT" : "BEGIN TRANSACTION";
+ return DoQuery(db_, query, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ return StepStatement(stmt, error);
+ });
+ }
+
+ AdbcStatusCode Commit(struct AdbcError* error) {
+ if (autocommit_) {
+ SetError(error, "Cannot commit when in autocommit mode");
return ADBC_STATUS_INVALID_STATE;
}
- return ADBC_STATUS_OK;
+ ADBC_RETURN_NOT_OK(
+ DoQuery(db_, "COMMIT", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ return StepStatement(stmt, error);
+ }));
+ return DoQuery(db_, "BEGIN TRANSACTION", error,
+ [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
}
- AdbcStatusCode Release(struct AdbcError* error) { return database_->Disconnect(error); }
+ AdbcStatusCode Rollback(struct AdbcError* error) {
+ if (autocommit_) {
+ SetError(error, "Cannot rollback when in autocommit mode");
+ return ADBC_STATUS_INVALID_STATE;
+ }
+ ADBC_RETURN_NOT_OK(
+ DoQuery(db_, "ROLLBACK", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ return StepStatement(stmt, error);
+ }));
+ return DoQuery(db_, "BEGIN TRANSACTION", error,
+ [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
+ }
private:
+ AdbcStatusCode StepStatement(sqlite3_stmt* stmt, struct AdbcError* error) {
+ int rc = SQLITE_OK;
+ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+ }
+ if (rc != SQLITE_DONE) {
+ return CheckRc(db_, rc, "sqlite3_step", error);
+ }
+ return ADBC_STATUS_OK;
+ }
+
std::shared_ptr<SqliteDatabaseImpl> database_;
sqlite3* db_;
+ bool autocommit_;
};
AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
@@ -1132,6 +1199,14 @@ AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
return status;
}
+AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+ return (*ptr)->Commit(error);
+}
+
AdbcStatusCode SqliteConnectionGetObjects(
struct AdbcConnection* connection, int depth, const char* catalog,
const char* db_schema, const char* table_name, const char** table_types,
@@ -1192,10 +1267,36 @@ AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
return status;
}
+AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+ return (*ptr)->Rollback(error);
+}
+
AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
const char* key, const char* value,
struct AdbcError* error) {
- return ADBC_STATUS_OK;
+ if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+ auto ptr =
+ reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+
+ if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
+ bool autocommit = false;
+ if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+ autocommit = false;
+ } else if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+ autocommit = true;
+ } else {
+ SetError(error, "Invalid option value for autocommit: ", value);
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ return (*ptr)->SetAutocommit(autocommit, error);
+ } else {
+ SetError(error, "Unknown option");
+ }
+ return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
@@ -1309,6 +1410,11 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
return SqliteDatabaseRelease(database, error);
}
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionCommit(connection, error);
+}
+
AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
const char* catalog, const char* db_schema,
const char* table_name, const char** table_types,
@@ -1350,6 +1456,11 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
return SqliteConnectionRelease(connection, error);
}
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+ struct AdbcError* error) {
+ return SqliteConnectionRollback(connection, error);
+}
+
AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
const char* value, struct AdbcError* error) {
return SqliteConnectionSetOption(connection, key, value, error);
@@ -1428,12 +1539,14 @@ AdbcStatusCode AdbcSqliteDriverInit(size_t count, struct AdbcDriver* driver,
driver->DatabaseRelease = SqliteDatabaseRelease;
driver->DatabaseSetOption = SqliteDatabaseSetOption;
+ driver->ConnectionCommit = SqliteConnectionCommit;
driver->ConnectionGetObjects = SqliteConnectionGetObjects;
driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
driver->ConnectionInit = SqliteConnectionInit;
driver->ConnectionNew = SqliteConnectionNew;
driver->ConnectionRelease = SqliteConnectionRelease;
+ driver->ConnectionRollback = SqliteConnectionRollback;
driver->ConnectionSetOption = SqliteConnectionSetOption;
driver->StatementBind = SqliteStatementBind;
diff --git a/drivers/sqlite/sqlite_test.cc b/drivers/sqlite/sqlite_test.cc
index dff4a6e..fb7d103 100644
--- a/drivers/sqlite/sqlite_test.cc
+++ b/drivers/sqlite/sqlite_test.cc
@@ -70,10 +70,32 @@ class Sqlite : public ::testing::Test {
}
protected:
+ void IngestSampleTable(struct AdbcConnection* connection) {
+ ArrowArray export_table;
+ ArrowSchema export_schema;
+ auto bulk_table =
+ adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_insert", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+ }
+
AdbcDatabase database;
AdbcConnection connection;
AdbcError error = {};
+ std::shared_ptr<arrow::Schema> bulk_schema = arrow::schema(
+ {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+
std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
arrow::field("ordinal_position", arrow::int32()),
@@ -299,12 +321,9 @@ TEST_F(Sqlite, BulkIngestStream) {
TEST_F(Sqlite, MultipleConnections) {
struct AdbcConnection connection2;
-
- {
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
- ASSERT_NE(connection.private_data, nullptr);
- }
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
+ ASSERT_NE(connection.private_data, nullptr);
{
std::string query = "CREATE TABLE foo (bar INTEGER)";
@@ -364,27 +383,7 @@ TEST_F(Sqlite, MetadataGetTableTypes) {
}
TEST_F(Sqlite, MetadataGetObjects) {
- // Create a table via ingestion
- {
- ArrowArray export_table;
- ArrowSchema export_schema;
- auto bulk_schema = arrow::schema(
- {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
- auto bulk_table =
- adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
- ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
- ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
- AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
- "bulk_insert", &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
- }
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
// Query for catalogs
AdbcStatement statement;
@@ -633,27 +632,7 @@ TEST_F(Sqlite, MetadataGetObjectsColumns) {
}
TEST_F(Sqlite, MetadataGetTableSchema) {
- // Create a table via ingestion
- auto bulk_schema = arrow::schema(
- {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
- {
- ArrowArray export_table;
- ArrowSchema export_schema;
- auto bulk_table =
- adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
- ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
- ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
- AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
- "bulk_insert", &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
- }
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
ArrowSchema export_schema;
ADBC_ASSERT_OK_WITH_ERROR(
@@ -665,4 +644,88 @@ TEST_F(Sqlite, MetadataGetTableSchema) {
ASSERT_SCHEMA_EQ(*schema, *bulk_schema);
}
+TEST_F(Sqlite, Transactions) {
+ // For this test, we explicitly want a shared DB
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error,
+ AdbcDatabaseSetOption(&database, "filename",
+ "file:Sqlite_Transactions?mode=memory&cache=shared", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &error));
+
+ struct AdbcConnection connection2;
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
+ ASSERT_NE(connection.private_data, nullptr);
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+
+ const char* query = "SELECT * FROM bulk_insert";
+
+ // Invalid option value
+ ASSERT_NE(ADBC_STATUS_OK,
+ AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ "invalid", &error));
+
+ // Can't call commit/rollback without disabling autocommit
+ ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection, &error));
+ ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection, &error));
+ error.release(&error);
+
+ // Ensure it's idempotent
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ ADBC_OPTION_VALUE_ENABLED, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ ADBC_OPTION_VALUE_ENABLED, &error));
+
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ ADBC_OPTION_VALUE_DISABLED, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
+
+ // Uncommitted change
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
+
+ // SQLite prevents us from executing the query
+ {
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+ ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, &error));
+ ASSERT_THAT(error.message, ::testing::HasSubstr("database schema is locked"));
+ error.release(&error);
+ }
+
+ // Rollback
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRollback(&connection, &error));
+
+ // Now nothing's visible
+ {
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+ ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, &error));
+ ASSERT_THAT(error.message, ::testing::HasSubstr("no such table"));
+ error.release(&error);
+ }
+
+ // Commit, should now be visible on other connection
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
+
+ {
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementSetSqlQuery(&statement, query, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+ ArrowArrayStream stream;
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementGetStream(&statement, &stream, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+ }
+
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2, &error));
+}
+
} // namespace adbc