You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/05/05 12:03:58 UTC

[arrow-adbc] branch main updated: feat(c/driver/postgresql): implement GetTableSchema (#577)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 2b25f49  feat(c/driver/postgresql): implement GetTableSchema (#577)
2b25f49 is described below

commit 2b25f49f51f0a7fa4626cc6537bdfe75b06cc0e9
Author: William Ayd <wi...@icloud.com>
AuthorDate: Fri May 5 05:03:52 2023 -0700

    feat(c/driver/postgresql): implement GetTableSchema (#577)
    
    Definitely a few things need to be ironed out and potentially done as
    pre-cursors, but I think this is far along enough to review
---
 c/driver/postgresql/connection.cc      | 100 ++++++++++++++++++++++++++++++++-
 c/driver/postgresql/postgresql_test.cc |   1 -
 2 files changed, 99 insertions(+), 2 deletions(-)

diff --git a/c/driver/postgresql/connection.cc b/c/driver/postgresql/connection.cc
index e079b39..227c7cb 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -17,15 +17,41 @@
 
 #include "connection.h"
 
+#include <cinttypes>
 #include <cstring>
 #include <memory>
+#include <string>
 
 #include <adbc.h>
+#include <libpq-fe.h>
 
 #include "database.h"
 #include "utils.h"
 
+namespace {
+class PqResultHelper {
+ public:
+  PqResultHelper(PGconn* conn, const char* query) : conn_(conn) {
+    query_ = std::string(query);
+  }
+  pg_result* Execute() {
+    result_ = PQexec(conn_, query_.c_str());
+    return result_;
+  }
+
+  ~PqResultHelper() {
+    if (result_ != nullptr) PQclear(result_);
+  }
+
+ private:
+  pg_result* result_ = nullptr;
+  PGconn* conn_;
+  std::string query_;
+};
+}  // namespace
+
 namespace adbcpq {
+
 AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
   if (autocommit_) {
     SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled");
@@ -47,7 +73,79 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
                                                   const char* table_name,
                                                   struct ArrowSchema* schema,
                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
+  AdbcStatusCode final_status = ADBC_STATUS_OK;
+  struct StringBuilder query = {0};
+  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+  if (StringBuilderAppend(
+          &query, "%s",
+          "SELECT attname, atttypid "
+          "FROM pg_catalog.pg_class AS cls "
+          "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+          "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+          "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+    return ADBC_STATUS_INTERNAL;
+
+  if (db_schema != nullptr) {
+    char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));
+    if (schema == NULL) {
+      SetError(error, "%s%s", "Faled to escape schema: ", PQerrorMessage(conn_));
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    int ret = StringBuilderAppend(&query, "%s%s", schema, ".");
+    PQfreemem(schema);
+
+    if (ret != 0) return ADBC_STATUS_INTERNAL;
+  }
+
+  char* table = PQescapeIdentifier(conn_, table_name, strlen(table_name));
+  if (table == NULL) {
+    SetError(error, "%s%s", "Failed to escape table: ", PQerrorMessage(conn_));
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  int ret = StringBuilderAppend(&query, "%s%s", table_name, "'::regclass::oid");
+  PQfreemem(table);
+
+  if (ret != 0) return ADBC_STATUS_INTERNAL;
+
+  PqResultHelper result_helper = PqResultHelper{conn_, query.buffer};
+  StringBuilderReset(&query);
+  pg_result* result = result_helper.Execute();
+
+  ExecStatusType pq_status = PQresultStatus(result);
+  auto uschema = nanoarrow::UniqueSchema();
+
+  if (pq_status == PGRES_TUPLES_OK) {
+    int num_rows = PQntuples(result);
+    ArrowSchemaInit(uschema.get());
+    CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(uschema.get(), num_rows), error);
+
+    ArrowError na_error;
+    for (int row = 0; row < num_rows; row++) {
+      const char* colname = PQgetvalue(result, row, 0);
+      const Oid pg_oid = static_cast<uint32_t>(
+          std::strtol(PQgetvalue(result, row, 1), /*str_end=*/nullptr, /*base=*/10));
+
+      PostgresType pg_type;
+      if (type_resolver_->Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+        SetError(error, "%s%d%s%s%s%" PRIu32, "Column #", row + 1, " (\"", colname,
+                 "\") has unknown type code ", pg_oid);
+        final_status = ADBC_STATUS_NOT_IMPLEMENTED;
+        break;
+      }
+
+      CHECK_NA(INTERNAL, pg_type.WithFieldName(colname).SetSchema(uschema->children[row]),
+               error);
+    }
+  } else {
+    SetError(error, "%s%s", "Failed to get table schema: ", PQerrorMessage(conn_));
+    final_status = ADBC_STATUS_IO;
+  }
+
+  uschema.move(schema);
+  return final_status;
 }
 
 AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc
index a395582..0765a9c 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -86,7 +86,6 @@ class PostgresConnectionTest : public ::testing::Test,
   void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }
 
   void TestMetadataGetInfo() { GTEST_SKIP() << "Not yet implemented"; }
-  void TestMetadataGetTableSchema() { GTEST_SKIP() << "Not yet implemented"; }
   void TestMetadataGetTableTypes() { GTEST_SKIP() << "Not yet implemented"; }
 
   void TestMetadataGetObjectsCatalogs() { GTEST_SKIP() << "Not yet implemented"; }