You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/05/05 12:03:58 UTC
[arrow-adbc] branch main updated: feat(c/driver/postgresql): implement GetTableSchema (#577)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new 2b25f49 feat(c/driver/postgresql): implement GetTableSchema (#577)
2b25f49 is described below
commit 2b25f49f51f0a7fa4626cc6537bdfe75b06cc0e9
Author: William Ayd <wi...@icloud.com>
AuthorDate: Fri May 5 05:03:52 2023 -0700
feat(c/driver/postgresql): implement GetTableSchema (#577)
Definitely a few things need to be ironed out and potentially done as
pre-cursors, but I think this is far along enough to review
---
c/driver/postgresql/connection.cc | 100 ++++++++++++++++++++++++++++++++-
c/driver/postgresql/postgresql_test.cc | 1 -
2 files changed, 99 insertions(+), 2 deletions(-)
diff --git a/c/driver/postgresql/connection.cc b/c/driver/postgresql/connection.cc
index e079b39..227c7cb 100644
--- a/c/driver/postgresql/connection.cc
+++ b/c/driver/postgresql/connection.cc
@@ -17,15 +17,41 @@
#include "connection.h"
+#include <cinttypes>
#include <cstring>
#include <memory>
+#include <string>
#include <adbc.h>
+#include <libpq-fe.h>
#include "database.h"
#include "utils.h"
+namespace {
+class PqResultHelper {
+ public:
+ PqResultHelper(PGconn* conn, const char* query) : conn_(conn) {
+ query_ = std::string(query);
+ }
+ pg_result* Execute() {
+ result_ = PQexec(conn_, query_.c_str());
+ return result_;
+ }
+
+ ~PqResultHelper() {
+ if (result_ != nullptr) PQclear(result_);
+ }
+
+ private:
+ pg_result* result_ = nullptr;
+ PGconn* conn_;
+ std::string query_;
+};
+} // namespace
+
namespace adbcpq {
+
AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
if (autocommit_) {
SetError(error, "%s", "[libpq] Cannot commit when autocommit is enabled");
@@ -47,7 +73,79 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
const char* table_name,
struct ArrowSchema* schema,
struct AdbcError* error) {
- return ADBC_STATUS_NOT_IMPLEMENTED;
+ AdbcStatusCode final_status = ADBC_STATUS_OK;
+ struct StringBuilder query = {0};
+ if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+ if (StringBuilderAppend(
+ &query, "%s",
+ "SELECT attname, atttypid "
+ "FROM pg_catalog.pg_class AS cls "
+ "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+ "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+ "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+ return ADBC_STATUS_INTERNAL;
+
+ if (db_schema != nullptr) {
+ char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));
+ if (schema == NULL) {
+ SetError(error, "%s%s", "Faled to escape schema: ", PQerrorMessage(conn_));
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ int ret = StringBuilderAppend(&query, "%s%s", schema, ".");
+ PQfreemem(schema);
+
+ if (ret != 0) return ADBC_STATUS_INTERNAL;
+ }
+
+ char* table = PQescapeIdentifier(conn_, table_name, strlen(table_name));
+ if (table == NULL) {
+ SetError(error, "%s%s", "Failed to escape table: ", PQerrorMessage(conn_));
+ return ADBC_STATUS_INVALID_ARGUMENT;
+ }
+
+ int ret = StringBuilderAppend(&query, "%s%s", table_name, "'::regclass::oid");
+ PQfreemem(table);
+
+ if (ret != 0) return ADBC_STATUS_INTERNAL;
+
+ PqResultHelper result_helper = PqResultHelper{conn_, query.buffer};
+ StringBuilderReset(&query);
+ pg_result* result = result_helper.Execute();
+
+ ExecStatusType pq_status = PQresultStatus(result);
+ auto uschema = nanoarrow::UniqueSchema();
+
+ if (pq_status == PGRES_TUPLES_OK) {
+ int num_rows = PQntuples(result);
+ ArrowSchemaInit(uschema.get());
+ CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(uschema.get(), num_rows), error);
+
+ ArrowError na_error;
+ for (int row = 0; row < num_rows; row++) {
+ const char* colname = PQgetvalue(result, row, 0);
+ const Oid pg_oid = static_cast<uint32_t>(
+ std::strtol(PQgetvalue(result, row, 1), /*str_end=*/nullptr, /*base=*/10));
+
+ PostgresType pg_type;
+ if (type_resolver_->Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+ SetError(error, "%s%d%s%s%s%" PRIu32, "Column #", row + 1, " (\"", colname,
+ "\") has unknown type code ", pg_oid);
+ final_status = ADBC_STATUS_NOT_IMPLEMENTED;
+ break;
+ }
+
+ CHECK_NA(INTERNAL, pg_type.WithFieldName(colname).SetSchema(uschema->children[row]),
+ error);
+ }
+ } else {
+ SetError(error, "%s%s", "Failed to get table schema: ", PQerrorMessage(conn_));
+ final_status = ADBC_STATUS_IO;
+ }
+
+ uschema.move(schema);
+ return final_status;
}
AdbcStatusCode PostgresConnection::Init(struct AdbcDatabase* database,
diff --git a/c/driver/postgresql/postgresql_test.cc b/c/driver/postgresql/postgresql_test.cc
index a395582..0765a9c 100644
--- a/c/driver/postgresql/postgresql_test.cc
+++ b/c/driver/postgresql/postgresql_test.cc
@@ -86,7 +86,6 @@ class PostgresConnectionTest : public ::testing::Test,
void TearDown() override { ASSERT_NO_FATAL_FAILURE(TearDownTest()); }
void TestMetadataGetInfo() { GTEST_SKIP() << "Not yet implemented"; }
- void TestMetadataGetTableSchema() { GTEST_SKIP() << "Not yet implemented"; }
void TestMetadataGetTableTypes() { GTEST_SKIP() << "Not yet implemented"; }
void TestMetadataGetObjectsCatalogs() { GTEST_SKIP() << "Not yet implemented"; }