You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/30 16:32:46 UTC
[arrow-adbc] branch main updated: feat(c/driver/postgresql): Timestamp write support (#861)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new fd240825 feat(c/driver/postgresql): Timestamp write support (#861)
fd240825 is described below
commit fd240825a346d5ad75afee5cbcdb183497dd91cf
Author: William Ayd <wi...@icloud.com>
AuthorDate: Fri Jun 30 09:32:42 2023 -0700
feat(c/driver/postgresql): Timestamp write support (#861)
---
c/driver/postgresql/statement.cc | 66 ++++++++++++++++++++++++
c/driver/sqlite/sqlite_test.cc | 3 ++
c/driver_manager/adbc_driver_manager_test.cc | 3 ++
c/validation/adbc_validation.cc | 77 ++++++++++++++++++++++++++++
c/validation/adbc_validation.h | 7 +++
5 files changed, 156 insertions(+)
diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 30926b4f..8bd80a3c 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -216,6 +216,16 @@ struct BindStream {
type_id = PostgresTypeId::kBytea;
param_lengths[i] = 0;
break;
+ case ArrowType::NANOARROW_TYPE_TIMESTAMP:
+ if (strcmp("", bind_schema_fields[i].timezone)) {
+ SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s",
+ static_cast<int64_t>(i + 1), " (\"", bind_schema->children[i]->name,
+ "\") has unsupported type code timestamp with timezone");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ type_id = PostgresTypeId::kTimestamp;
+ param_lengths[i] = 8;
+ break;
default:
SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
static_cast<uint64_t>(i + 1), " ('", bind_schema->children[i]->name,
@@ -337,6 +347,53 @@ struct BindStream {
param_values[col] = const_cast<char*>(view.data.as_char);
break;
}
+ case ArrowType::NANOARROW_TYPE_TIMESTAMP: {
+ int64_t val = array_view->children[col]->buffer_views[1].data.as_int64[row];
+ if (strcmp("", bind_schema_fields[col].timezone)) {
+ SetError(error, "[libpq] Column #%" PRIi64 "%s%s%s", col + 1, " (\"",
+ PQfname(result, col),
+ "\") has unsupported type code timestamp with timezone");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+
+ // 2000-01-01 00:00:00.000000 in microseconds
+ constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
+ constexpr int64_t kSecOverflowLimit = 9223372036854;
+ constexpr int64_t kmSecOverflowLimit = 9223372036854775;
+
+ auto unit = bind_schema_fields[col].time_unit;
+ switch (unit) {
+ case NANOARROW_TIME_UNIT_SECOND:
+ if (abs(val) > kSecOverflowLimit) {
+ SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 "%s",
+ col + 1, "('", bind_schema->children[col]->name, "') Row #",
+ row + 1,
+ "has value which exceeds postgres timestamp limits");
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ val *= 1000000;
+ break;
+ case NANOARROW_TIME_UNIT_MILLI:
+ if (abs(val) > kmSecOverflowLimit) {
+ SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 "%s",
+ col + 1, "('", bind_schema->children[col]->name, "') Row #",
+ row + 1,
+ "has value which exceeds postgres timestamp limits");
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+ val *= 1000;
+ break;
+ case NANOARROW_TIME_UNIT_MICRO:
+ break;
+ case NANOARROW_TIME_UNIT_NANO:
+ val /= 1000;
+ break;
+ }
+
+ const uint64_t value = ToNetworkInt64(val - kPostgresTimestampEpoch);
+ std::memcpy(param_values[col], &value, sizeof(int64_t));
+ break;
+ }
default:
SetError(error, "%s%" PRId64 "%s%s%s%s", "[libpq] Field #", col + 1, " ('",
bind_schema->children[col]->name,
@@ -605,6 +662,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
case ArrowType::NANOARROW_TYPE_BINARY:
create += " BYTEA";
break;
+ case ArrowType::NANOARROW_TYPE_TIMESTAMP:
+ if (strcmp("", source_schema_fields[i].timezone)) {
+ SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s", static_cast<int64_t>(i + 1),
+ " (\"", source_schema.children[i]->name,
+ "\") has unsupported type for ingestion timestamp with timezone");
+ return ADBC_STATUS_NOT_IMPLEMENTED;
+ }
+ create += " TIMESTAMP";
+ break;
default:
SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
static_cast<uint64_t>(i + 1), " ('", source_schema.children[i]->name,
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 8a580cdb..d245c1da 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -171,6 +171,9 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestUInt64() { GTEST_SKIP() << "Cannot ingest UINT64 (out of range)"; }
void TestSqlIngestBinary() { GTEST_SKIP() << "Cannot ingest BINARY (not implemented)"; }
+ void TestSqlIngestTimestamp() {
+ GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
+ }
protected:
SqliteQuirks quirks_;
diff --git a/c/driver_manager/adbc_driver_manager_test.cc b/c/driver_manager/adbc_driver_manager_test.cc
index 99fa477b..d33114ee 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -226,6 +226,9 @@ class SqliteStatementTest : public ::testing::Test,
void TestSqlIngestUInt64() { GTEST_SKIP() << "Cannot ingest UINT64 (out of range)"; }
void TestSqlIngestBinary() { GTEST_SKIP() << "Cannot ingest BINARY (not implemented)"; }
+ void TestSqlIngestTimestamp() {
+ GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
+ }
protected:
SqliteQuirks quirks_;
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index d73b5564..61f23fff 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1094,6 +1094,83 @@ void StatementTest::TestSqlIngestBinary() {
NANOARROW_TYPE_BINARY, {std::nullopt, "", "\x00\x01\x02\x04", "\xFE\xFF"}));
}
+template <enum ArrowTimeUnit TU>
+void StatementTest::TestSqlIngestTemporalType() {
+ if (!quirks()->supports_bulk_ingest()) {
+ GTEST_SKIP();
+ }
+
+ ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+ IsOkStatus(&error));
+
+ Handle<struct ArrowSchema> schema;
+ Handle<struct ArrowArray> array;
+ struct ArrowError na_error;
+ const std::vector<std::optional<int64_t>> values = {std::nullopt, 0, 42};
+
+ // much of this code is shared with TestSqlIngestType with minor
+ // changes to allow for various time units to be tested
+ ArrowSchemaInit(&schema.value);
+ ArrowSchemaSetTypeStruct(&schema.value, 1);
+ ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP, TU,
+ /*timezone=*/nullptr);
+ ArrowSchemaSetName(schema->children[0], "col");
+ ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values),
+ IsOkErrno());
+
+ ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+ "bulk_ingest", &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+ IsOkStatus(&error));
+
+ int64_t rows_affected = 0;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(rows_affected,
+ ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
+
+ ASSERT_THAT(AdbcStatementSetSqlQuery(
+ &statement,
+ "SELECT * FROM bulk_ingest ORDER BY \"col\" ASC NULLS FIRST", &error),
+ IsOkStatus(&error));
+ {
+ StreamReader reader;
+ ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+ &reader.rows_affected, &error),
+ IsOkStatus(&error));
+ ASSERT_THAT(reader.rows_affected,
+ ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
+
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
+ {{"col", NANOARROW_TYPE_TIMESTAMP, NULLABLE}}));
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_NE(nullptr, reader.array->release);
+ ASSERT_EQ(values.size(), reader.array->length);
+ ASSERT_EQ(1, reader.array->n_children);
+
+ if (TU == NANOARROW_TIME_UNIT_MICRO) {
+ // Similar to the TestSqlIngestType implementation we are only now
+ // testing values if the unit round trips
+ ASSERT_NO_FATAL_FAILURE(
+ CompareArray<int64_t>(reader.array_view->children[0], values));
+ }
+
+ ASSERT_NO_FATAL_FAILURE(reader.Next());
+ ASSERT_EQ(nullptr, reader.array->release);
+ }
+}
+
+void StatementTest::TestSqlIngestTimestamp() {
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>());
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>());
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>());
+ ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>());
+}
+
void StatementTest::TestSqlIngestAppend() {
if (!quirks()->supports_bulk_ingest()) {
GTEST_SKIP();
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 4e4251bf..1650d064 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -229,6 +229,9 @@ class StatementTest {
void TestSqlIngestString();
void TestSqlIngestBinary();
+ // Temporal
+ void TestSqlIngestTimestamp();
+
// ---- End Type-specific tests ----------------
void TestSqlIngestAppend();
@@ -269,6 +272,9 @@ class StatementTest {
template <typename CType>
void TestSqlIngestNumericType(ArrowType type);
+
+ template <enum ArrowTimeUnit TU>
+ void TestSqlIngestTemporalType();
};
#define ADBCV_TEST_STATEMENT(FIXTURE) \
@@ -288,6 +294,7 @@ class StatementTest {
TEST_F(FIXTURE, SqlIngestFloat64) { TestSqlIngestFloat64(); } \
TEST_F(FIXTURE, SqlIngestString) { TestSqlIngestString(); } \
TEST_F(FIXTURE, SqlIngestBinary) { TestSqlIngestBinary(); } \
+ TEST_F(FIXTURE, SqlIngestTimestamp) { TestSqlIngestTimestamp(); } \
TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); } \
TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); } \
TEST_F(FIXTURE, SqlIngestMultipleConnections) { TestSqlIngestMultipleConnections(); } \