You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/06/24 17:32:37 UTC

[arrow-adbc] branch main updated: Add basic transaction semantics (#24)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new b1b62fd  Add basic transaction semantics (#24)
b1b62fd is described below

commit b1b62fd2d9bd9252426230bf60dfac547d262153
Author: David Li <li...@gmail.com>
AuthorDate: Fri Jun 24 13:32:32 2022 -0400

    Add basic transaction semantics (#24)
    
    Fixes #23.
---
 adbc.h                                          |  97 +++++++++-----
 adbc_driver_manager/adbc_driver_manager.cc      |  54 +++++++-
 adbc_driver_manager/adbc_driver_manager_test.cc |  11 ++
 drivers/sqlite/sqlite.cc                        | 163 ++++++++++++++++++++----
 drivers/sqlite/sqlite_test.cc                   | 159 ++++++++++++++++-------
 5 files changed, 375 insertions(+), 109 deletions(-)

diff --git a/adbc.h b/adbc.h
index 9e1ed97..f21415f 100644
--- a/adbc.h
+++ b/adbc.h
@@ -69,8 +69,6 @@ struct ArrowArray {
 
 #endif  // ARROW_C_DATA_INTERFACE
 
-// EXPERIMENTAL: C stream interface
-
 #ifndef ARROW_C_STREAM_INTERFACE
 #define ARROW_C_STREAM_INTERFACE
 
@@ -252,6 +250,11 @@ struct ADBC_EXPORT AdbcError {
 
 /// }@
 
+/// \brief Canonical option value for enabling an option.
+#define ADBC_OPTION_VALUE_ENABLED "true"
+/// \brief Canonical option value for disabling an option.
+#define ADBC_OPTION_VALUE_DISABLED "false"
+
 /// \defgroup adbc-database Database initialization.
 /// Clients first initialize a database, then connect to the database
 /// (below). For client-server databases, one of these steps may be a
@@ -340,32 +343,6 @@ ADBC_EXPORT
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error);
 
-/// \defgroup adbc-connection-partition Partitioned Results
-/// Some databases may internally partition the results. These
-/// partitions are exposed to clients who may wish to integrate them
-/// with a threaded or distributed execution model, where partitions
-/// can be divided among threads or machines for processing.
-///
-/// Drivers are not required to support partitioning.
-///
-/// Partitions are not ordered. If the result set is sorted,
-/// implementations should return a single partition.
-///
-/// @{
-
-/// \brief Construct a statement for a partition of a query. The
-///   statement can then be read independently.
-///
-/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
-ADBC_EXPORT
-AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
-                                                      const uint8_t* serialized_partition,
-                                                      size_t serialized_length,
-                                                      struct AdbcStatement* statement,
-                                                      struct AdbcError* error);
-
-/// }@
-
 /// \defgroup adbc-connection-metadata Metadata
 /// Functions for retrieving metadata about the database.
 ///
@@ -530,6 +507,64 @@ AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
 
 /// }@
 
+/// \defgroup adbc-connection-partition Partitioned Results
+/// Some databases may internally partition the results. These
+/// partitions are exposed to clients who may wish to integrate them
+/// with a threaded or distributed execution model, where partitions
+/// can be divided among threads or machines for processing.
+///
+/// Drivers are not required to support partitioning.
+///
+/// Partitions are not ordered. If the result set is sorted,
+/// implementations should return a single partition.
+///
+/// @{
+
+/// \brief Construct a statement for a partition of a query. The
+///   statement can then be read independently.
+///
+/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
+                                                      const uint8_t* serialized_partition,
+                                                      size_t serialized_length,
+                                                      struct AdbcStatement* statement,
+                                                      struct AdbcError* error);
+
+/// }@
+
+/// \defgroup adbc-connection-transaction Transaction Semantics
+///
+/// Connections start out in auto-commit mode by default (if
+/// applicable for the given vendor). Use
+/// ADBC_CONNECTION_OPTION_AUTO_COMMIT to change this.
+///
+/// @{
+
+/// \brief The name of the canonical option for whether autocommit is
+///   enabled.
+#define ADBC_CONNECTION_OPTION_AUTOCOMMIT "adbc.connection.autocommit"
+
+/// \brief Commit any pending transactions. Only used if autocommit is
+///   disabled.
+///
+/// Behavior is undefined if this is mixed with SQL transaction
+/// statements.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+                                    struct AdbcError* error);
+
+/// \brief Roll back any pending transactions. Only used if autocommit
+///   is disabled.
+///
+/// Behavior is undefined if this is mixed with SQL transaction
+/// statements.
+ADBC_EXPORT
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+                                      struct AdbcError* error);
+
+/// }@
+
 /// }@
 
 /// \defgroup adbc-statement Managing statements.
@@ -784,11 +819,15 @@ struct ADBC_EXPORT AdbcDriver {
                                         struct AdbcError*);
   AdbcStatusCode (*ConnectionInit)(struct AdbcConnection*, struct AdbcError*);
   AdbcStatusCode (*ConnectionRelease)(struct AdbcConnection*, struct AdbcError*);
+
   AdbcStatusCode (*ConnectionDeserializePartitionDesc)(struct AdbcConnection*,
                                                        const uint8_t*, size_t,
                                                        struct AdbcStatement*,
                                                        struct AdbcError*);
 
+  AdbcStatusCode (*ConnectionCommit)(struct AdbcConnection*, struct AdbcError*);
+  AdbcStatusCode (*ConnectionRollback)(struct AdbcConnection*, struct AdbcError*);
+
   AdbcStatusCode (*ConnectionGetObjects)(struct AdbcConnection*, int, const char*,
                                          const char*, const char*, const char**,
                                          const char*, struct AdbcStatement*,
@@ -842,7 +881,7 @@ typedef AdbcStatusCode (*AdbcDriverInitFunc)(size_t count, struct AdbcDriver* dr
 // struct/entrypoint instead?
 
 // For use with count
-#define ADBC_VERSION_0_0_1 28
+#define ADBC_VERSION_0_0_1 26
 
 /// }@
 
diff --git a/adbc_driver_manager/adbc_driver_manager.cc b/adbc_driver_manager/adbc_driver_manager.cc
index e120f59..5fc91f6 100644
--- a/adbc_driver_manager/adbc_driver_manager.cc
+++ b/adbc_driver_manager/adbc_driver_manager.cc
@@ -61,6 +61,11 @@ void SetError(struct AdbcError* error, const std::string& message) {
 }
 
 // Default stubs
+
+AdbcStatusCode ConnectionCommit(struct AdbcConnection*, struct AdbcError* error) {
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
 AdbcStatusCode ConnectionGetTableTypes(struct AdbcConnection*, struct AdbcStatement*,
                                        struct AdbcError* error) {
   return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -78,6 +83,10 @@ AdbcStatusCode ConnectionGetTableSchema(struct AdbcConnection*, const char*, con
   return ADBC_STATUS_NOT_IMPLEMENTED;
 }
 
+AdbcStatusCode ConnectionRollback(struct AdbcConnection*, struct AdbcError* error) {
+  return ADBC_STATUS_NOT_IMPLEMENTED;
+}
+
 AdbcStatusCode StatementBind(struct AdbcStatement*, struct ArrowArray*,
                              struct ArrowSchema*, struct AdbcError* error) {
   return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -274,15 +283,12 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
   return status;
 }
 
-AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
-                                 struct AdbcConnection* connection,
-                                 struct AdbcError* error) {
-  if (!database->private_driver) {
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+                                    struct AdbcError* error) {
+  if (!connection->private_driver) {
     return ADBC_STATUS_INVALID_STATE;
   }
-  auto status = database->private_driver->ConnectionNew(database, connection, error);
-  connection->private_driver = database->private_driver;
-  return status;
+  return connection->private_driver->ConnectionCommit(connection, error);
 }
 
 AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
@@ -293,6 +299,17 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
   return connection->private_driver->ConnectionInit(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
+                                 struct AdbcConnection* connection,
+                                 struct AdbcError* error) {
+  if (!database->private_driver) {
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  auto status = database->private_driver->ConnectionNew(database, connection, error);
+  connection->private_driver = database->private_driver;
+  return status;
+}
+
 AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
                                      struct AdbcError* error) {
   if (!connection->private_driver) {
@@ -303,6 +320,22 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
   return status;
 }
 
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+                                      struct AdbcError* error) {
+  if (!connection->private_driver) {
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  return connection->private_driver->ConnectionRollback(connection, error);
+}
+
+AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
+                                       const char* value, struct AdbcError* error) {
+  if (!connection->private_driver) {
+    return ADBC_STATUS_INVALID_STATE;
+  }
+  return connection->private_driver->ConnectionSetOption(connection, key, value, error);
+}
+
 AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
                                  struct ArrowArray* values, struct ArrowSchema* schema,
                                  struct AdbcError* error) {
@@ -543,10 +576,17 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
   CHECK_REQUIRED(driver, DatabaseInit);
   CHECK_REQUIRED(driver, DatabaseRelease);
 
+  CHECK_REQUIRED(driver, ConnectionNew);
+  CHECK_REQUIRED(driver, ConnectionInit);
+  CHECK_REQUIRED(driver, ConnectionRelease);
+  FILL_DEFAULT(driver, ConnectionCommit);
   FILL_DEFAULT(driver, ConnectionGetObjects);
   FILL_DEFAULT(driver, ConnectionGetTableSchema);
   FILL_DEFAULT(driver, ConnectionGetTableTypes);
+  FILL_DEFAULT(driver, ConnectionRollback);
 
+  CHECK_REQUIRED(driver, StatementNew);
+  CHECK_REQUIRED(driver, StatementRelease);
   FILL_DEFAULT(driver, StatementBind);
   FILL_DEFAULT(driver, StatementExecute);
   FILL_DEFAULT(driver, StatementPrepare);
diff --git a/adbc_driver_manager/adbc_driver_manager_test.cc b/adbc_driver_manager/adbc_driver_manager_test.cc
index cdfc4e2..0c5781f 100644
--- a/adbc_driver_manager/adbc_driver_manager_test.cc
+++ b/adbc_driver_manager/adbc_driver_manager_test.cc
@@ -217,4 +217,15 @@ TEST_F(DriverManager, BulkIngestStream) {
   }
 }
 
+TEST_F(DriverManager, Transactions) {
+  // Invalid option value
+  ASSERT_NE(ADBC_STATUS_OK,
+            AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                    "invalid", &error));
+
+  // Can't commit/rollback without disabling autocommit
+  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection, &error));
+  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection, &error));
+}
+
 }  // namespace adbc
diff --git a/drivers/sqlite/sqlite.cc b/drivers/sqlite/sqlite.cc
index c11ec23..ca8ca0d 100644
--- a/drivers/sqlite/sqlite.cc
+++ b/drivers/sqlite/sqlite.cc
@@ -156,10 +156,25 @@ class SqliteDatabaseImpl {
  public:
   explicit SqliteDatabaseImpl() : db_(nullptr), connection_count_(0) {}
 
-  sqlite3* Connect() {
+  AdbcStatusCode Connect(sqlite3** db, struct AdbcError* error) {
     std::lock_guard<std::mutex> guard(mutex_);
-    if (db_) ++connection_count_;
-    return db_;
+    if (!db_) {
+      SetError(error, "Database not yet initialized, call AdbcDatabaseInit");
+      return ADBC_STATUS_INVALID_STATE;
+    }
+    // Create a new connection
+    if (database_uri_ == ":memory:") {
+      // unless the special ":memory:" filename is used
+      *db = db_;
+    } else {
+      int rc =
+          sqlite3_open_v2(database_uri_.c_str(), db,
+                          SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
+                          /*zVfs=*/nullptr);
+      ADBC_RETURN_NOT_OK(CheckRc(*db, nullptr, rc, "sqlite3_open_v2", error));
+    }
+    ++connection_count_;
+    return ADBC_STATUS_OK;
   }
 
   AdbcStatusCode Init(struct AdbcError* error) {
@@ -167,11 +182,14 @@ class SqliteDatabaseImpl {
       SetError(error, "Database already initialized");
       return ADBC_STATUS_INVALID_STATE;
     }
-    const char* filename = ":memory:";
+    database_uri_ = "file:adbc_sqlite_driver?mode=memory&cache=shared";
     auto it = options_.find("filename");
-    if (it != options_.end()) filename = it->second.c_str();
+    if (it != options_.end()) {
+      database_uri_ = it->second;
+    }
 
-    int rc = sqlite3_open_v2(filename, &db_, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
+    int rc = sqlite3_open_v2(database_uri_.c_str(), &db_,
+                             SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
                              /*zVfs=*/nullptr);
     ADBC_RETURN_NOT_OK(CheckRc(db_, nullptr, rc, "sqlite3_open_v2", error));
     options_.clear();
@@ -187,12 +205,19 @@ class SqliteDatabaseImpl {
     return ADBC_STATUS_OK;
   }
 
-  AdbcStatusCode Disconnect(struct AdbcError* error) {
+  AdbcStatusCode Disconnect(sqlite3* db, struct AdbcError* error) {
     std::lock_guard<std::mutex> guard(mutex_);
     if (--connection_count_ < 0) {
       SetError(error, "Connection count underflow");
       return ADBC_STATUS_INVALID_STATE;
     }
+    // Close the database unless :memory:
+    if (database_uri_ != ":memory:") {
+      if (sqlite3_close(db) != SQLITE_OK) {
+        if (db) SetError(db, "sqlite3_close", error);
+        return ADBC_STATUS_IO;
+      }
+    }
     return ADBC_STATUS_OK;
   }
 
@@ -216,6 +241,7 @@ class SqliteDatabaseImpl {
  private:
   sqlite3* db_;
   int connection_count_;
+  std::string database_uri_;
   std::unordered_map<std::string, std::string> options_;
   std::mutex mutex_;
 };
@@ -223,7 +249,7 @@ class SqliteDatabaseImpl {
 class SqliteConnectionImpl {
  public:
   explicit SqliteConnectionImpl(std::shared_ptr<SqliteDatabaseImpl> database)
-      : database_(std::move(database)), db_(nullptr) {}
+      : database_(std::move(database)), db_(nullptr), autocommit_(true) {}
 
   sqlite3* db() const { return db_; }
 
@@ -246,34 +272,75 @@ class SqliteConnectionImpl {
     }
     query += escaped;
 
-    sqlite3_stmt* stmt;
-    int rc = sqlite3_prepare_v2(db_, query.c_str(), static_cast<int>(query.size()), &stmt,
-                                /*pzTail=*/nullptr);
-    ADBC_RETURN_NOT_OK(CheckRc(db_, stmt, rc, "sqlite3_prepare_v2", error));
-    rc = sqlite3_step(stmt);
-    if (rc == SQLITE_ERROR) {
-      return CheckRc(db_, stmt, rc, "sqlite3_step", error);
-    }
-    auto arrow_schema = StatementToSchema(stmt);
+    std::shared_ptr<arrow::Schema> arrow_schema;
     ADBC_RETURN_NOT_OK(
-        CheckRc(db_, stmt, sqlite3_finalize(stmt), "sqlite3_finalize", error));
+        DoQuery(db_, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+          if (sqlite3_step(stmt) == SQLITE_ERROR) {
+            SetError(db_, "sqlite3_step", error);
+            return ADBC_STATUS_IO;
+          }
+          arrow_schema = StatementToSchema(stmt);
+          return ADBC_STATUS_OK;
+        }));
     return FromArrowStatus(arrow::ExportSchema(*arrow_schema, schema), error);
   }
 
-  AdbcStatusCode Init(struct AdbcError* error) {
-    db_ = database_->Connect();
-    if (!db_) {
-      SetError(error, "Database not yet initialized!");
+  AdbcStatusCode Init(struct AdbcError* error) { return database_->Connect(&db_, error); }
+
+  AdbcStatusCode Release(struct AdbcError* error) {
+    return database_->Disconnect(db_, error);
+  }
+
+  AdbcStatusCode SetAutocommit(bool autocommit, struct AdbcError* error) {
+    if (autocommit == autocommit_) return ADBC_STATUS_OK;
+    autocommit_ = autocommit;
+
+    const char* query = autocommit_ ? "COMMIT" : "BEGIN TRANSACTION";
+    return DoQuery(db_, query, error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+      return StepStatement(stmt, error);
+    });
+  }
+
+  AdbcStatusCode Commit(struct AdbcError* error) {
+    if (autocommit_) {
+      SetError(error, "Cannot commit when in autocommit mode");
       return ADBC_STATUS_INVALID_STATE;
     }
-    return ADBC_STATUS_OK;
+    ADBC_RETURN_NOT_OK(
+        DoQuery(db_, "COMMIT", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+          return StepStatement(stmt, error);
+        }));
+    return DoQuery(db_, "BEGIN TRANSACTION", error,
+                   [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
   }
 
-  AdbcStatusCode Release(struct AdbcError* error) { return database_->Disconnect(error); }
+  AdbcStatusCode Rollback(struct AdbcError* error) {
+    if (autocommit_) {
+      SetError(error, "Cannot rollback when in autocommit mode");
+      return ADBC_STATUS_INVALID_STATE;
+    }
+    ADBC_RETURN_NOT_OK(
+        DoQuery(db_, "ROLLBACK", error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+          return StepStatement(stmt, error);
+        }));
+    return DoQuery(db_, "BEGIN TRANSACTION", error,
+                   [&](sqlite3_stmt* stmt) { return StepStatement(stmt, error); });
+  }
 
  private:
+  AdbcStatusCode StepStatement(sqlite3_stmt* stmt, struct AdbcError* error) {
+    int rc = SQLITE_OK;
+    while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
+    }
+    if (rc != SQLITE_DONE) {
+      return CheckRc(db_, rc, "sqlite3_step", error);
+    }
+    return ADBC_STATUS_OK;
+  }
+
   std::shared_ptr<SqliteDatabaseImpl> database_;
   sqlite3* db_;
+  bool autocommit_;
 };
 
 AdbcStatusCode BindParameters(sqlite3_stmt* stmt, const arrow::RecordBatch& data,
@@ -1132,6 +1199,14 @@ AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
   return status;
 }
 
+AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
+                                      struct AdbcError* error) {
+  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+  return (*ptr)->Commit(error);
+}
+
 AdbcStatusCode SqliteConnectionGetObjects(
     struct AdbcConnection* connection, int depth, const char* catalog,
     const char* db_schema, const char* table_name, const char** table_types,
@@ -1192,10 +1267,36 @@ AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
   return status;
 }
 
+AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
+                                        struct AdbcError* error) {
+  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+  return (*ptr)->Rollback(error);
+}
+
 AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
                                          const char* key, const char* value,
                                          struct AdbcError* error) {
-  return ADBC_STATUS_OK;
+  if (!connection->private_data) return ADBC_STATUS_INVALID_STATE;
+  auto ptr =
+      reinterpret_cast<std::shared_ptr<SqliteConnectionImpl>*>(connection->private_data);
+
+  if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
+    bool autocommit = false;
+    if (std::strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
+      autocommit = false;
+    } else if (std::strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
+      autocommit = true;
+    } else {
+      SetError(error, "Invalid option value for autocommit: ", value);
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+    return (*ptr)->SetAutocommit(autocommit, error);
+  } else {
+    SetError(error, "Unknown option");
+  }
+  return ADBC_STATUS_NOT_IMPLEMENTED;
 }
 
 AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
@@ -1309,6 +1410,11 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
   return SqliteDatabaseRelease(database, error);
 }
 
+AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
+                                    struct AdbcError* error) {
+  return SqliteConnectionCommit(connection, error);
+}
+
 AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
                                         const char* catalog, const char* db_schema,
                                         const char* table_name, const char** table_types,
@@ -1350,6 +1456,11 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
   return SqliteConnectionRelease(connection, error);
 }
 
+AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
+                                      struct AdbcError* error) {
+  return SqliteConnectionRollback(connection, error);
+}
+
 AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
                                        const char* value, struct AdbcError* error) {
   return SqliteConnectionSetOption(connection, key, value, error);
@@ -1428,12 +1539,14 @@ AdbcStatusCode AdbcSqliteDriverInit(size_t count, struct AdbcDriver* driver,
   driver->DatabaseRelease = SqliteDatabaseRelease;
   driver->DatabaseSetOption = SqliteDatabaseSetOption;
 
+  driver->ConnectionCommit = SqliteConnectionCommit;
   driver->ConnectionGetObjects = SqliteConnectionGetObjects;
   driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
   driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
   driver->ConnectionInit = SqliteConnectionInit;
   driver->ConnectionNew = SqliteConnectionNew;
   driver->ConnectionRelease = SqliteConnectionRelease;
+  driver->ConnectionRollback = SqliteConnectionRollback;
   driver->ConnectionSetOption = SqliteConnectionSetOption;
 
   driver->StatementBind = SqliteStatementBind;
diff --git a/drivers/sqlite/sqlite_test.cc b/drivers/sqlite/sqlite_test.cc
index dff4a6e..fb7d103 100644
--- a/drivers/sqlite/sqlite_test.cc
+++ b/drivers/sqlite/sqlite_test.cc
@@ -70,10 +70,32 @@ class Sqlite : public ::testing::Test {
   }
 
  protected:
+  void IngestSampleTable(struct AdbcConnection* connection) {
+    ArrowArray export_table;
+    ArrowSchema export_schema;
+    auto bulk_table =
+        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "bulk_insert", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+  }
+
   AdbcDatabase database;
   AdbcConnection connection;
   AdbcError error = {};
 
+  std::shared_ptr<arrow::Schema> bulk_schema = arrow::schema(
+      {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
+
   std::shared_ptr<arrow::DataType> column_schema = arrow::struct_({
       arrow::field("column_name", arrow::utf8(), /*nullable=*/false),
       arrow::field("ordinal_position", arrow::int32()),
@@ -299,12 +321,9 @@ TEST_F(Sqlite, BulkIngestStream) {
 
 TEST_F(Sqlite, MultipleConnections) {
   struct AdbcConnection connection2;
-
-  {
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
-    ASSERT_NE(connection.private_data, nullptr);
-  }
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
+  ASSERT_NE(connection.private_data, nullptr);
 
   {
     std::string query = "CREATE TABLE foo (bar INTEGER)";
@@ -364,27 +383,7 @@ TEST_F(Sqlite, MetadataGetTableTypes) {
 }
 
 TEST_F(Sqlite, MetadataGetObjects) {
-  // Create a table via ingestion
-  {
-    ArrowArray export_table;
-    ArrowSchema export_schema;
-    auto bulk_schema = arrow::schema(
-        {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
-    auto bulk_table =
-        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
-    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
-    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
-    AdbcStatement statement;
-    std::memset(&statement, 0, sizeof(statement));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
-                                      "bulk_insert", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
-  }
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
 
   // Query for catalogs
   AdbcStatement statement;
@@ -633,27 +632,7 @@ TEST_F(Sqlite, MetadataGetObjectsColumns) {
 }
 
 TEST_F(Sqlite, MetadataGetTableSchema) {
-  // Create a table via ingestion
-  auto bulk_schema = arrow::schema(
-      {arrow::field("ints", arrow::int64()), arrow::field("strs", arrow::utf8())});
-  {
-    ArrowArray export_table;
-    ArrowSchema export_schema;
-    auto bulk_table =
-        adbc::RecordBatchFromJSON(bulk_schema, R"([[1, "foo"], [2, "bar"]])");
-    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
-    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
-
-    AdbcStatement statement;
-    std::memset(&statement, 0, sizeof(statement));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
-                                      "bulk_insert", &error));
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
-  }
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
 
   ArrowSchema export_schema;
   ADBC_ASSERT_OK_WITH_ERROR(
@@ -665,4 +644,88 @@ TEST_F(Sqlite, MetadataGetTableSchema) {
   ASSERT_SCHEMA_EQ(*schema, *bulk_schema);
 }
 
+TEST_F(Sqlite, Transactions) {
+  // For this test, we explicitly want a shared DB
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error,
+      AdbcDatabaseSetOption(&database, "filename",
+                            "file:Sqlite_Transactions?mode=memory&cache=shared", &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &error));
+
+  struct AdbcConnection connection2;
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&database, &connection2, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &error));
+  ASSERT_NE(connection.private_data, nullptr);
+
+  AdbcStatement statement;
+  std::memset(&statement, 0, sizeof(statement));
+
+  const char* query = "SELECT * FROM bulk_insert";
+
+  // Invalid option value
+  ASSERT_NE(ADBC_STATUS_OK,
+            AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                    "invalid", &error));
+
+  // Can't call commit/rollback without disabling autocommit
+  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection, &error));
+  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection, &error));
+  error.release(&error);
+
+  // Ensure it's idempotent
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                     ADBC_OPTION_VALUE_ENABLED, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                     ADBC_OPTION_VALUE_ENABLED, &error));
+
+  ADBC_ASSERT_OK_WITH_ERROR(
+      error, AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                     ADBC_OPTION_VALUE_DISABLED, &error));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
+
+  // Uncommitted change
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
+
+  // SQLite prevents us from executing the query
+  {
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+    ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, &error));
+    ASSERT_THAT(error.message, ::testing::HasSubstr("database schema is locked"));
+    error.release(&error);
+  }
+
+  // Rollback
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRollback(&connection, &error));
+
+  // Now nothing's visible
+  {
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+    ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, &error));
+    ASSERT_THAT(error.message, ::testing::HasSubstr("no such table"));
+    error.release(&error);
+  }
+
+  // Commit, should now be visible on other connection
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
+
+  {
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementSetSqlQuery(&statement, query, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
+    ArrowArrayStream stream;
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementGetStream(&statement, &stream, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+  }
+
+  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2, &error));
+}
+
 }  // namespace adbc