You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2022/08/10 19:16:36 UTC
[arrow-adbc] branch main updated: [C][Python] Add options to control append vs create for bulk ingest (#58)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 479ab30 [C][Python] Add options to control append vs create for bulk ingest (#58)
479ab30 is described below
commit 479ab30746dc9a6276c4447cd35b241f90fcbffd
Author: David Li <li...@gmail.com>
AuthorDate: Wed Aug 10 15:16:32 2022 -0400
[C][Python] Add options to control append vs create for bulk ingest (#58)
* [C][Python] Add options to control bulk ingestion
* [Python] Fix issue with PyArrow compat
---
adbc.h | 8 +++
c/drivers/sqlite/sqlite.cc | 32 ++++++++---
c/drivers/sqlite/sqlite_test.cc | 66 +++++++++++++++++++++-
.../adbc_driver_manager/_lib.pyx | 6 ++
.../adbc_driver_manager/dbapi.py | 4 +-
5 files changed, 106 insertions(+), 10 deletions(-)
diff --git a/adbc.h b/adbc.h
index 26f66b3..e7d9d51 100644
--- a/adbc.h
+++ b/adbc.h
@@ -782,6 +782,14 @@ AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const cha
/// ADBC_STATUS_ALREADY_EXISTS should be raised. Else, data should be
/// appended to the target table.
#define ADBC_INGEST_OPTION_TARGET_TABLE "adbc.ingest.target_table"
+/// \brief Whether to create (the default) or append.
+#define ADBC_INGEST_OPTION_MODE "adbc.ingest.mode"
+/// \brief Create the table and insert data; error if the table exists.
+#define ADBC_INGEST_OPTION_MODE_CREATE "adbc.ingest.mode.create"
+/// \brief Do not create the table, and insert data; error if the
+/// table does not exist (ADBC_STATUS_NOT_FOUND) or does not match
+/// the schema of the data to append (ADBC_STATUS_ALREADY_EXISTS).
+#define ADBC_INGEST_OPTION_MODE_APPEND "adbc.ingest.mode.append"
/// }@
diff --git a/c/drivers/sqlite/sqlite.cc b/c/drivers/sqlite/sqlite.cc
index 729160e..ddaf94d 100644
--- a/c/drivers/sqlite/sqlite.cc
+++ b/c/drivers/sqlite/sqlite.cc
@@ -570,7 +570,7 @@ class SqliteStatementReader : public arrow::RecordBatchReader {
class SqliteStatementImpl {
public:
explicit SqliteStatementImpl(std::shared_ptr<SqliteConnectionImpl> connection)
- : connection_(std::move(connection)), stmt_(nullptr) {}
+ : connection_(std::move(connection)), append_(false), stmt_(nullptr) {}
AdbcStatusCode Close(struct AdbcError* error) {
if (stmt_) {
@@ -1164,6 +1164,16 @@ class SqliteStatementImpl {
if (std::strlen(value) == 0) return ADBC_STATUS_INVALID_ARGUMENT;
bulk_table_ = value;
return ADBC_STATUS_OK;
+ } else if (std::strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
+ if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
+ append_ = true;
+ } else if (std::strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
+ append_ = false;
+ } else {
+ SetError(error, "Unknown value '", value, "' for option: ", key);
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ return ADBC_STATUS_OK;
}
SetError(error, "Unknown option: ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
@@ -1191,9 +1201,9 @@ class SqliteStatementImpl {
sqlite3* db = connection_->db();
// Create the table
- {
+ if (!append_) {
// XXX: not injection-safe
- std::string query = "CREATE TABLE IF NOT EXISTS ";
+ std::string query = "CREATE TABLE ";
query += bulk_table_;
query += " (";
const auto& fields = bind_parameters_->schema()->fields();
@@ -1203,12 +1213,13 @@ class SqliteStatementImpl {
}
query += ')';
- ADBC_RETURN_NOT_OK(
- DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
+ if (DoQuery(db, query.c_str(), error, [&](sqlite3_stmt* stmt) -> AdbcStatusCode {
const int rc = sqlite3_step(stmt);
if (rc == SQLITE_DONE) return ADBC_STATUS_OK;
return CheckRc(db, stmt, rc, "sqlite3_step", error);
- }));
+ }) != ADBC_STATUS_OK) {
+ return ADBC_STATUS_ALREADY_EXISTS;
+ }
}
// Insert the rows
@@ -1227,8 +1238,14 @@ class SqliteStatementImpl {
int rc = sqlite3_prepare_v2(db, query.c_str(), static_cast<int>(query.size()),
&stmt, /*pzTail=*/nullptr);
if (rc != SQLITE_OK) {
+ // XXX: not a great way to try to figure out the right error
+ AdbcStatusCode code = ADBC_STATUS_ALREADY_EXISTS;
+ if (std::strstr(sqlite3_errmsg(db), "no such table:")) {
+ code = ADBC_STATUS_NOT_FOUND;
+ }
+ // Clean up
std::ignore = CheckRc(db, stmt, rc, "sqlite3_prepare_v2", error);
- return ADBC_STATUS_ALREADY_EXISTS;
+ return code;
}
ADBC_RETURN_NOT_OK(DoQuery(db, stmt, error, [&]() -> AdbcStatusCode {
int rc = SQLITE_OK;
@@ -1282,6 +1299,7 @@ class SqliteStatementImpl {
// Bulk ingestion
// Target of bulk ingestion (rather janky to store state like this, though…)
std::string bulk_table_;
+ bool append_;
// Prepared statements
sqlite3_stmt* stmt_;
diff --git a/c/drivers/sqlite/sqlite_test.cc b/c/drivers/sqlite/sqlite_test.cc
index 7958c64..5af85ac 100644
--- a/c/drivers/sqlite/sqlite_test.cc
+++ b/c/drivers/sqlite/sqlite_test.cc
@@ -278,7 +278,7 @@ TEST_F(Sqlite, BulkIngestTable) {
EXPECT_THAT(batches, ::testing::UnorderedPointwise(PointeesEqual(), {bulk_table}));
}
- // Append
+ // Append (already exists)
{
ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
@@ -291,6 +291,26 @@ TEST_F(Sqlite, BulkIngestTable) {
"bulk_insert", &error));
ADBC_ASSERT_OK_WITH_ERROR(
error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+ }
+
+ // Append (proper option)
+ {
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_insert", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementExecute(&statement, &error));
ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
@@ -312,7 +332,7 @@ TEST_F(Sqlite, BulkIngestTable) {
bulk_schema, R"([[1, "foo"], [2, "bar"], [1, "foo"], [2, "bar"]])")}));
}
- // Conflict
+ // Conflict (table exists)
{
auto bulk_schema = arrow::schema({arrow::field("ints", arrow::int64())});
auto bulk_table = adbc::RecordBatchFromJSON(bulk_schema, R"([[1], [2]])");
@@ -330,6 +350,48 @@ TEST_F(Sqlite, BulkIngestTable) {
ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
}
+
+ // Append (table does not exist)
+ {
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "this_table_does_not_exist", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ASSERT_EQ(ADBC_STATUS_NOT_FOUND, AdbcStatementExecute(&statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+ }
+
+ // Append (schema conflict)
+ {
+ auto bulk_schema = arrow::schema({arrow::field("ints", arrow::int64())});
+ auto bulk_table = adbc::RecordBatchFromJSON(bulk_schema, R"([[1], [2]])");
+ ASSERT_OK(ExportRecordBatch(*bulk_table, &export_table));
+ ASSERT_OK(ExportSchema(*bulk_schema, &export_schema));
+
+ AdbcStatement statement;
+ std::memset(&statement, 0, sizeof(statement));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection, &statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_insert", &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
+ ADBC_INGEST_OPTION_MODE_APPEND, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(
+ error, AdbcStatementBind(&statement, &export_table, &export_schema, &error));
+ ASSERT_EQ(ADBC_STATUS_ALREADY_EXISTS, AdbcStatementExecute(&statement, &error));
+ ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
+ }
}
TEST_F(Sqlite, BulkIngestStream) {
diff --git a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
index d34be95..b868f8e 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
+++ b/python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
@@ -64,6 +64,9 @@ cdef extern from "adbc.h" nogil:
cdef const char* ADBC_CONNECTION_OPTION_AUTOCOMMIT
cdef const char* ADBC_INGEST_OPTION_TARGET_TABLE
+ cdef const char* ADBC_INGEST_OPTION_MODE
+ cdef const char* ADBC_INGEST_OPTION_MODE_APPEND
+ cdef const char* ADBC_INGEST_OPTION_MODE_CREATE
cdef int ADBC_OBJECT_DEPTH_ALL
cdef int ADBC_OBJECT_DEPTH_CATALOGS
@@ -306,6 +309,9 @@ class NotSupportedError(DatabaseError):
INGEST_OPTION_TARGET_TABLE = ADBC_INGEST_OPTION_TARGET_TABLE.decode("utf-8")
+INGEST_OPTION_MODE = ADBC_INGEST_OPTION_MODE.decode("utf-8")
+INGEST_OPTION_MODE_APPEND = ADBC_INGEST_OPTION_MODE_APPEND.decode("utf-8")
+INGEST_OPTION_MODE_CREATE = ADBC_INGEST_OPTION_MODE_CREATE.decode("utf-8")
cdef void check_error(CAdbcStatusCode status, CAdbcError* error) except *:
diff --git a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
index dc63c03..e8d933d 100644
--- a/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
+++ b/python/adbc_driver_manager/adbc_driver_manager/dbapi.py
@@ -448,7 +448,9 @@ class _RowIterator(_Closeable):
self.rownumber = 0
def close(self) -> None:
- self._reader.close()
+ if hasattr(self._reader, "close"):
+ # Only in recent PyArrow
+ self._reader.close()
@property
def description(self) -> List[tuple]: