You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/10 19:16:36 UTC

[arrow-adbc] branch main updated: [C][Python] Add options to control append vs create for bulk ingest (#58)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 479ab30  [C][Python] Add options to control append vs create for bulk ingest (#58)
479ab30 is described below

commit 479ab30746dc9a6276c4447cd35b241f90fcbffd
Author: David Li <li...@gmail.com>
AuthorDate: Wed Aug 10 15:16:32 2022 -0400

    [C][Python] Add options to control append vs create for bulk ingest (#58)
    
    * [C][Python] Add options to control bulk ingestion
    
    * [Python] Fix issue with PyArrow compat
---
 adbc.h                                             |  8 +++
 c/drivers/sqlite/sqlite.cc                         | 32 ++++++++---
 c/drivers/sqlite/sqlite_test.cc                    | 66 +++++++++++++++++++++-
 .../adbc_driver_manager/_lib.pyx                   |  6 ++
 .../adbc_driver_manager/dbapi.py                   |  4 +-
 5 files changed, 106 insertions(+), 10 deletions(-)

diff --git a/adbc.h b/adbc.h
index 26f66b3..e7d9d51 100644
--- a/adbc.h
+++ b/adbc.h
@@ -782,6 +782,14 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha
 /// ADBC_STATUS_ALREADY_EXISTS should be raised.  Else, data should be
 /// appended to the target table.
 #define ADBC_INGEST_OPTION_TARGET_TABLE "adbc.ingest.target_table"
+/// \brief Whether to create (the default) or append.
+#define ADBC_INGEST_OPTION_MODE "adbc.ingest.mode"
+/// \brief Create the table and insert data; error if the table exists.
+#define ADBC_INGEST_OPTION_MODE_CREATE "adbc.ingest.mode.create"
+/// \brief Do not create the table, and insert data; error if the
+///   table does not exist (ADBC_STATUS_NOT_FOUND) or does not match
+///   the schema of the data to append (ADBC_STATUS_ALREADY_EXISTS).
+#define ADBC_INGEST_OPTION_MODE_APPEND "adbc.ingest.mode.append"
 
 /// }@
 
diff --git a/c/drivers/sqlite/sqlite.cc b/c/drivers/sqlite/sqlite.cc
index 729160e..ddaf94d 100644
--- a/c/drivers/sqlite/sqlite.cc
+++ b/c/drivers/sqlite/sqlite.cc
@@ -570,7 +570,7 @@ class SqliteStatementReader : public arrow::RecordBatchReader {
 class SqliteStatementImpl {
  public:
   explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
-      : connection_(std::move(connection)), stmt_(nullptr) {}
+      : connection_(std::move(connection)), append_(false), stmt_(nullptr) {}
 
   AdbcStatusCode Close(struct AdbcError* error) {
     if (stmt_) {
@@ -1164,6 +1164,16 @@ class SqliteStatementImpl {
       if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
       bulk_table_ = value;
       return ADBC_STATUS_OK;
+    } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
+      if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
+        append_ = true;
+      } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
+        append_ = false;
+      } else {
+        SetError(error, "Unknown value '", value, "' for option: ", key);
+        return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+      return ADBC_STATUS_OK;
     }
     SetError(error, "Unknown option: ", key);
     return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -1191,9 +1201,9 @@ class SqliteStatementImpl {
     sqlite3* db = connection_->db();
 
     // Create the table
-    {
+    if (!append_) {
       // XXX: not injection-safe
-      std::string query = "CREATE TABLE IF NOT EXISTS ";
+      std::string query = "CREATE TABLE ";
       query += bulk_table_;
       query += " (";
       const auto& fields = bind_parameters_->schema()->fields();
@@ -1203,12 +1213,13 @@ class SqliteStatementImpl {
       }
       query += ')';
 
-      ADBC_RETURN_NOT_OK(
-          DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+      if (DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
             const int rc = sqlite3_step(stmt);
             if (rc == SQLITE_DONE) return ADBC_STATUS_OK;
             return CheckRc(db, stmt, rc, "sqlite3_step", error);
-          }));
+          }) != ADBC_STATUS_OK) {
+        return ADBC_STATUS_ALREADY_EXISTS;
+      }
     }
 
     // Insert the rows
@@ -1227,8 +1238,14 @@ class SqliteStatementImpl {
       int rc = sqlite3_prepare_v2(db, query.c_str(), static_cast<int>(query.size()),
                                   &stmt, /*pzTail=*/nullptr);
       if (rc != SQLITE_OK) {
+        // XXX: not a great way to try to figure out the right error
+        AdbcStatusCode code = ADBC_STATUS_ALREADY_EXISTS;
+        if (std::strstr(sqlite3_errmsg(db), "no such table:")) {
+          code = ADBC_STATUS_NOT_FOUND;
+        }
+        // Clean up
         std::ignore = CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
-        return ADBC_STATUS_ALREADY_EXISTS;
+        return code;
       }
       ADBC_RETURN_NOT_OK(DoQuery(db, stmt, error, [&]() -> AdbcStatusCode {
         int rc = SQLITE_OK;
@@ -1282,6 +1299,7 @@ class SqliteStatementImpl {
   // Bulk ingestion
   // Target of bulk ingestion (rather janky to store state like this, though…)
   std::string bulk_table_;
+  bool append_;
 
   // Prepared statements
   sqlite3_stmt* stmt_;
diff --git a/c/drivers/sqlite/sqlite_test.cc b/c/drivers/sqlite/sqlite_test.cc
index 7958c64..5af85ac 100644
--- a/c/drivers/sqlite/sqlite_test.cc
+++ b/c/drivers/sqlite/sqlite_test.cc
@@ -278,7 +278,7 @@ TEST_F(Sqlite, BulkIngestTable) {
     EXPECT_THAT(batches, ::testing::UnorderedPointwise(PointeesEqual(), {bulk_table}));
   }
 
-  // Append
+  // Append (already exists)
   {
     ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
     ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
@@ -291,6 +291,26 @@ TEST_F(Sqlite, BulkIngestTable) {
                                       "bulk_insert", &error));
     ADBC_ASSERT_OK_WITH_ERROR(
         error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+  }
+
+  // Append (proper option)
+  {
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "bulk_insert", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+                                      ADBC_INGEST_OPTION_MODE_APPEND, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
     ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
     ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
 
@@ -312,7 +332,7 @@ TEST_F(Sqlite, BulkIngestTable) {
                 bulk_schema, R"([[1, "foo"], [2, "bar"], [1, "foo"], [2, "bar"]])")}));
   }
 
-  // Conflict
+  // Conflict (table exists)
   {
     auto bulk_schema = arrow::schema({arrow::field("ints", arrow::int64())});
     auto bulk_table = adbc::RecordBatchFromJSON(bulk_schema, R"([[1], [2]])");
@@ -330,6 +350,48 @@ TEST_F(Sqlite, BulkIngestTable) {
     ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
     ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
   }
+
+  // Append (table does not exist)
+  {
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "this_table_does_not_exist", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+                                      ADBC_INGEST_OPTION_MODE_APPEND, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ASSERT_EQ(ADBC_STATUS_NOT_FOUND, AdbcStatementExecute(&statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+  }
+
+  // Append (schema conflict)
+  {
+    auto bulk_schema = arrow::schema({arrow::field("ints", arrow::int64())});
+    auto bulk_table = adbc::RecordBatchFromJSON(bulk_schema, R"([[1], [2]])");
+    ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+    ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+    AdbcStatement statement;
+    std::memset(&statement, 0, sizeof(statement));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                      "bulk_insert", &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+                                      ADBC_INGEST_OPTION_MODE_APPEND, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(
+        error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+    ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
+    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+  }
 }
 
 TEST_F(Sqlite, BulkIngestStream) {
diff --git a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
index d34be95..b868f8e 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
+++ b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
@@ -64,6 +64,9 @@ cdef extern from "adbc.h" nogil:
 
     cdef const char* ADBC_CONNECTION_OPTION_AUTOCOMMIT
     cdef const char* ADBC_INGEST_OPTION_TARGET_TABLE
+    cdef const char* ADBC_INGEST_OPTION_MODE
+    cdef const char* ADBC_INGEST_OPTION_MODE_APPEND
+    cdef const char* ADBC_INGEST_OPTION_MODE_CREATE
 
     cdef int ADBC_OBJECT_DEPTH_ALL
     cdef int ADBC_OBJECT_DEPTH_CATALOGS
@@ -306,6 +309,9 @@ class NotSupportedError(DatabaseError):
 
 
 INGEST_OPTION_TARGET_TABLE = ADBC_INGEST_OPTION_TARGET_TABLE.decode("utf-8")
+INGEST_OPTION_MODE = ADBC_INGEST_OPTION_MODE.decode("utf-8")
+INGEST_OPTION_MODE_APPEND = ADBC_INGEST_OPTION_MODE_APPEND.decode("utf-8")
+INGEST_OPTION_MODE_CREATE = ADBC_INGEST_OPTION_MODE_CREATE.decode("utf-8")
 
 
 cdef void check_error(CAdbcStatusCode status, CAdbcError* error) except *:
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index dc63c03..e8d933d 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -448,7 +448,9 @@ class _RowIterator(_Closeable):
         self.rownumber = 0
 
     def close(self) -> None:
-        self._reader.close()
+        if hasattr(self._reader, "close"):
+            # Only in recent PyArrow
+            self._reader.close()
 
     @property
     def description(self) -> List[tuple]: