You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/30 16:32:46 UTC

[arrow-adbc] branch main updated: feat(c/driver/postgresql): Timestamp write support (#861)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new fd240825 feat(c/driver/postgresql): Timestamp write support (#861)
fd240825 is described below

commit fd240825a346d5ad75afee5cbcdb183497dd91cf
Author: William Ayd <wi...@icloud.com>
AuthorDate: Fri Jun 30 09:32:42 2023 -0700

    feat(c/driver/postgresql): Timestamp write support (#861)
---
 c/driver/postgresql/statement.cc             | 66 ++++++++++++++++++++++++
 c/driver/sqlite/sqlite_test.cc               |  3 ++
 c/driver_manager/adbc_driver_manager_test.cc |  3 ++
 c/validation/adbc_validation.cc              | 77 ++++++++++++++++++++++++++++
 c/validation/adbc_validation.h               |  7 +++
 5 files changed, 156 insertions(+)

diff --git a/c/driver/postgresql/statement.cc b/c/driver/postgresql/statement.cc
index 30926b4f..8bd80a3c 100644
--- a/c/driver/postgresql/statement.cc
+++ b/c/driver/postgresql/statement.cc
@@ -216,6 +216,16 @@ struct BindStream {
           type_id = PostgresTypeId::kBytea;
           param_lengths[i] = 0;
           break;
+        case ArrowType::NANOARROW_TYPE_TIMESTAMP:
+          if (strcmp("", bind_schema_fields[i].timezone)) {
+            SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s",
+                     static_cast<int64_t>(i + 1), " (\"", bind_schema->children[i]->name,
+                     "\") has unsupported type code timestamp with timezone");
+            return ADBC_STATUS_NOT_IMPLEMENTED;
+          }
+          type_id = PostgresTypeId::kTimestamp;
+          param_lengths[i] = 8;
+          break;
         default:
           SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
                    static_cast<uint64_t>(i + 1), " ('", bind_schema->children[i]->name,
@@ -337,6 +347,53 @@ struct BindStream {
               param_values[col] = const_cast<char*>(view.data.as_char);
               break;
             }
+            case ArrowType::NANOARROW_TYPE_TIMESTAMP: {
+              int64_t val = array_view->children[col]->buffer_views[1].data.as_int64[row];
+              if (strcmp("", bind_schema_fields[col].timezone)) {
+                SetError(error, "[libpq] Column #%" PRIi64 "%s%s%s", col + 1, " (\"",
+                         PQfname(result, col),
+                         "\") has unsupported type code timestamp with timezone");
+                return ADBC_STATUS_NOT_IMPLEMENTED;
+              }
+
+              // 2000-01-01 00:00:00.000000 in microseconds
+              constexpr int64_t kPostgresTimestampEpoch = 946684800000000;
+              constexpr int64_t kSecOverflowLimit = 9223372036854;
+              constexpr int64_t kmSecOverflowLimit = 9223372036854775;
+
+              auto unit = bind_schema_fields[col].time_unit;
+              switch (unit) {
+                case NANOARROW_TIME_UNIT_SECOND:
+                  if (abs(val) > kSecOverflowLimit) {
+                    SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 "%s",
+                             col + 1, "('", bind_schema->children[col]->name, "') Row #",
+                             row + 1,
+                             "has value which exceeds postgres timestamp limits");
+                    return ADBC_STATUS_INVALID_ARGUMENT;
+                  }
+                  val *= 1000000;
+                  break;
+                case NANOARROW_TIME_UNIT_MILLI:
+                  if (abs(val) > kmSecOverflowLimit) {
+                    SetError(error, "[libpq] Field #%" PRId64 "%s%s%s%" PRId64 "%s",
+                             col + 1, "('", bind_schema->children[col]->name, "') Row #",
+                             row + 1,
+                             "has value which exceeds postgres timestamp limits");
+                    return ADBC_STATUS_INVALID_ARGUMENT;
+                  }
+                  val *= 1000;
+                  break;
+                case NANOARROW_TIME_UNIT_MICRO:
+                  break;
+                case NANOARROW_TIME_UNIT_NANO:
+                  val /= 1000;
+                  break;
+              }
+
+              const uint64_t value = ToNetworkInt64(val - kPostgresTimestampEpoch);
+              std::memcpy(param_values[col], &value, sizeof(int64_t));
+              break;
+            }
             default:
               SetError(error, "%s%" PRId64 "%s%s%s%s", "[libpq] Field #", col + 1, " ('",
                        bind_schema->children[col]->name,
@@ -605,6 +662,15 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
       case ArrowType::NANOARROW_TYPE_BINARY:
         create += " BYTEA";
         break;
+      case ArrowType::NANOARROW_TYPE_TIMESTAMP:
+        if (strcmp("", source_schema_fields[i].timezone)) {
+          SetError(error, "[libpq] Field #%" PRIi64 "%s%s%s", static_cast<int64_t>(i + 1),
+                   " (\"", source_schema.children[i]->name,
+                   "\") has unsupported type for ingestion timestamp with timezone");
+          return ADBC_STATUS_NOT_IMPLEMENTED;
+        }
+        create += " TIMESTAMP";
+        break;
       default:
         SetError(error, "%s%" PRIu64 "%s%s%s%s", "[libpq] Field #",
                  static_cast<uint64_t>(i + 1), " ('", source_schema.children[i]->name,
diff --git a/c/driver/sqlite/sqlite_test.cc b/c/driver/sqlite/sqlite_test.cc
index 8a580cdb..d245c1da 100644
--- a/c/driver/sqlite/sqlite_test.cc
+++ b/c/driver/sqlite/sqlite_test.cc
@@ -171,6 +171,9 @@ class SqliteStatementTest : public ::testing::Test,
 
   void TestSqlIngestUInt64() { GTEST_SKIP() << "Cannot ingest UINT64 (out of range)"; }
   void TestSqlIngestBinary() { GTEST_SKIP() << "Cannot ingest BINARY (not implemented)"; }
+  void TestSqlIngestTimestamp() {
+    GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
+  }
 
  protected:
   SqliteQuirks quirks_;
diff --git a/c/driver_manager/adbc_driver_manager_test.cc b/c/driver_manager/adbc_driver_manager_test.cc
index 99fa477b..d33114ee 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -226,6 +226,9 @@ class SqliteStatementTest : public ::testing::Test,
 
   void TestSqlIngestUInt64() { GTEST_SKIP() << "Cannot ingest UINT64 (out of range)"; }
   void TestSqlIngestBinary() { GTEST_SKIP() << "Cannot ingest BINARY (not implemented)"; }
+  void TestSqlIngestTimestamp() {
+    GTEST_SKIP() << "Cannot ingest TIMESTAMP (not implemented)";
+  }
 
  protected:
   SqliteQuirks quirks_;
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index d73b5564..61f23fff 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -1094,6 +1094,83 @@ void StatementTest::TestSqlIngestBinary() {
       NANOARROW_TYPE_BINARY, {std::nullopt, "", "\x00\x01\x02\x04", "\xFE\xFF"}));
 }
 
+template <enum ArrowTimeUnit TU>
+void StatementTest::TestSqlIngestTemporalType() {
+  if (!quirks()->supports_bulk_ingest()) {
+    GTEST_SKIP();
+  }
+
+  ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
+              IsOkStatus(&error));
+
+  Handle<struct ArrowSchema> schema;
+  Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  const std::vector<std::optional<int64_t>> values = {std::nullopt, 0, 42};
+
+  // much of this code is shared with TestSqlIngestType with minor
+  // changes to allow for various time units to be tested
+  ArrowSchemaInit(&schema.value);
+  ArrowSchemaSetTypeStruct(&schema.value, 1);
+  ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP, TU,
+                             /*timezone=*/nullptr);
+  ArrowSchemaSetName(schema->children[0], "col");
+  ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values),
+              IsOkErrno());
+
+  ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
+                                     "bulk_ingest", &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
+              IsOkStatus(&error));
+
+  int64_t rows_affected = 0;
+  ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
+              IsOkStatus(&error));
+  ASSERT_THAT(rows_affected,
+              ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
+
+  ASSERT_THAT(AdbcStatementSetSqlQuery(
+                  &statement,
+                  "SELECT * FROM bulk_ingest ORDER BY \"col\" ASC NULLS FIRST", &error),
+              IsOkStatus(&error));
+  {
+    StreamReader reader;
+    ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
+                                          &reader.rows_affected, &error),
+                IsOkStatus(&error));
+    ASSERT_THAT(reader.rows_affected,
+                ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
+
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+    ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
+                                          {{"col", NANOARROW_TYPE_TIMESTAMP, NULLABLE}}));
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_NE(nullptr, reader.array->release);
+    ASSERT_EQ(values.size(), reader.array->length);
+    ASSERT_EQ(1, reader.array->n_children);
+
+    if (TU == NANOARROW_TIME_UNIT_MICRO) {
+      // Similar to the TestSqlIngestType implementation we are only now
+      // testing values if the unit round trips
+      ASSERT_NO_FATAL_FAILURE(
+          CompareArray<int64_t>(reader.array_view->children[0], values));
+    }
+
+    ASSERT_NO_FATAL_FAILURE(reader.Next());
+    ASSERT_EQ(nullptr, reader.array->release);
+  }
+}
+
+void StatementTest::TestSqlIngestTimestamp() {
+  ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_SECOND>());
+  ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MICRO>());
+  ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_MILLI>());
+  ASSERT_NO_FATAL_FAILURE(TestSqlIngestTemporalType<NANOARROW_TIME_UNIT_NANO>());
+}
+
 void StatementTest::TestSqlIngestAppend() {
   if (!quirks()->supports_bulk_ingest()) {
     GTEST_SKIP();
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 4e4251bf..1650d064 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -229,6 +229,9 @@ class StatementTest {
   void TestSqlIngestString();
   void TestSqlIngestBinary();
 
+  // Temporal
+  void TestSqlIngestTimestamp();
+
   // ---- End Type-specific tests ----------------
 
   void TestSqlIngestAppend();
@@ -269,6 +272,9 @@ class StatementTest {
 
   template <typename CType>
   void TestSqlIngestNumericType(ArrowType type);
+
+  template <enum ArrowTimeUnit TU>
+  void TestSqlIngestTemporalType();
 };
 
 #define ADBCV_TEST_STATEMENT(FIXTURE)                                                   \
@@ -288,6 +294,7 @@ class StatementTest {
   TEST_F(FIXTURE, SqlIngestFloat64) { TestSqlIngestFloat64(); }                         \
   TEST_F(FIXTURE, SqlIngestString) { TestSqlIngestString(); }                           \
   TEST_F(FIXTURE, SqlIngestBinary) { TestSqlIngestBinary(); }                           \
+  TEST_F(FIXTURE, SqlIngestTimestamp) { TestSqlIngestTimestamp(); }                     \
   TEST_F(FIXTURE, SqlIngestAppend) { TestSqlIngestAppend(); }                           \
   TEST_F(FIXTURE, SqlIngestErrors) { TestSqlIngestErrors(); }                           \
   TEST_F(FIXTURE, SqlIngestMultipleConnections) { TestSqlIngestMultipleConnections(); } \