You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/05/03 12:15:35 UTC

[GitHub] [arrow-adbc] lidavidm commented on a diff in pull request #577: feat(c/driver/postgres): Implement GetTableSchema

lidavidm commented on code in PR #577:
URL: https://github.com/apache/arrow-adbc/pull/577#discussion_r1183605071


##########
c/driver/postgresql/connection.cc:
##########
@@ -47,7 +49,80 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
                                                   const char* table_name,
                                                   struct ArrowSchema* schema,
                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
+  AdbcStatusCode final_status = ADBC_STATUS_OK;
+  struct StringBuilder query = {0};
+  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+  if (StringBuilderAppend(
+          &query, "%s",
+          "SELECT attname, atttypid "
+          "FROM pg_catalog.pg_class AS cls "
+          "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+          "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+          "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+    return ADBC_STATUS_INTERNAL;
+
+  if (db_schema != nullptr) {
+    char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));
+    if (schema == NULL) {
+      SetError(error, "%s%s", "Faled to escape schema: ", PQerrorMessage(conn_));
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    int ret = StringBuilderAppend(&query, "%s%s", schema, ".");
+    PQfreemem(schema);
+
+    if (ret != 0) return ADBC_STATUS_INTERNAL;
+  }
+
+  char* table = PQescapeIdentifier(conn_, table_name, strlen(table_name));
+  if (table == NULL) {
+    SetError(error, "%s%s", "Failed to escape table: ", PQerrorMessage(conn_));
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  int ret = StringBuilderAppend(&query, "%s%s", table_name, "'::regclass::oid");
+  PQfreemem(table);
+
+  if (ret != 0) return ADBC_STATUS_INTERNAL;

Review Comment:
   Might be worth factoring out something?



##########
c/driver/postgresql/connection.cc:
##########
@@ -47,7 +49,80 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
                                                   const char* table_name,
                                                   struct ArrowSchema* schema,
                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
+  AdbcStatusCode final_status = ADBC_STATUS_OK;
+  struct StringBuilder query = {0};
+  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+  if (StringBuilderAppend(
+          &query, "%s",
+          "SELECT attname, atttypid "
+          "FROM pg_catalog.pg_class AS cls "
+          "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+          "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+          "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+    return ADBC_STATUS_INTERNAL;
+
+  if (db_schema != nullptr) {
+    char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));
+    if (schema == NULL) {
+      SetError(error, "%s%s", "Faled to escape schema: ", PQerrorMessage(conn_));
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    int ret = StringBuilderAppend(&query, "%s%s", schema, ".");
+    PQfreemem(schema);
+
+    if (ret != 0) return ADBC_STATUS_INTERNAL;
+  }
+
+  char* table = PQescapeIdentifier(conn_, table_name, strlen(table_name));
+  if (table == NULL) {
+    SetError(error, "%s%s", "Failed to escape table: ", PQerrorMessage(conn_));
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  int ret = StringBuilderAppend(&query, "%s%s", table_name, "'::regclass::oid");
+  PQfreemem(table);
+
+  if (ret != 0) return ADBC_STATUS_INTERNAL;
+
+  pg_result* result = PQexec(conn_, query.buffer);
+  StringBuilderReset(&query);
+
+  ExecStatusType pq_status = PQresultStatus(result);
+  if (pq_status == PGRES_TUPLES_OK) {
+    int num_rows = PQntuples(result);
+    ArrowSchemaInit(schema);
+    CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(schema, num_rows), error);
+
+    ArrowError na_error;
+    for (int row = 0; row < num_rows; row++) {
+      const char* colname = PQgetvalue(result, row, 0);
+      const Oid pg_oid = static_cast<uint32_t>(
+          std::strtol(PQgetvalue(result, row, 1), /*str_end=*/nullptr, /*base=*/10));
+
+      PostgresType pg_type;
+      if (type_resolver_->Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {

Review Comment:
   Do we need to clear the result set + disconnect in this branch? Maybe we can use final_status instead of the early return?
   
   (...maybe we want an RAII helper over pg_result)



##########
c/driver/postgresql/connection.cc:
##########
@@ -47,7 +49,80 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
                                                   const char* table_name,
                                                   struct ArrowSchema* schema,
                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
+  AdbcStatusCode final_status = ADBC_STATUS_OK;
+  struct StringBuilder query = {0};
+  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+  if (StringBuilderAppend(
+          &query, "%s",
+          "SELECT attname, atttypid "
+          "FROM pg_catalog.pg_class AS cls "
+          "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+          "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+          "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+    return ADBC_STATUS_INTERNAL;
+
+  if (db_schema != nullptr) {
+    char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));
+    if (schema == NULL) {
+      SetError(error, "%s%s", "Faled to escape schema: ", PQerrorMessage(conn_));
+      return ADBC_STATUS_INVALID_ARGUMENT;
+    }
+
+    int ret = StringBuilderAppend(&query, "%s%s", schema, ".");
+    PQfreemem(schema);
+
+    if (ret != 0) return ADBC_STATUS_INTERNAL;
+  }
+
+  char* table = PQescapeIdentifier(conn_, table_name, strlen(table_name));
+  if (table == NULL) {
+    SetError(error, "%s%s", "Failed to escape table: ", PQerrorMessage(conn_));
+    return ADBC_STATUS_INVALID_ARGUMENT;
+  }
+
+  int ret = StringBuilderAppend(&query, "%s%s", table_name, "'::regclass::oid");
+  PQfreemem(table);
+
+  if (ret != 0) return ADBC_STATUS_INTERNAL;
+
+  pg_result* result = PQexec(conn_, query.buffer);
+  StringBuilderReset(&query);
+
+  ExecStatusType pq_status = PQresultStatus(result);
+  if (pq_status == PGRES_TUPLES_OK) {
+    int num_rows = PQntuples(result);
+    ArrowSchemaInit(schema);
+    CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(schema, num_rows), error);
+
+    ArrowError na_error;
+    for (int row = 0; row < num_rows; row++) {
+      const char* colname = PQgetvalue(result, row, 0);
+      const Oid pg_oid = static_cast<uint32_t>(
+          std::strtol(PQgetvalue(result, row, 1), /*str_end=*/nullptr, /*base=*/10));
+
+      PostgresType pg_type;
+      if (type_resolver_->Find(pg_oid, &pg_type, &na_error) != NANOARROW_OK) {
+        SetError(error, "%s%d%s%s%s%" PRIu32, "Column #", row + 1, " (\"", colname,
+                 "\") has unknown type code ", pg_oid);
+        return ADBC_STATUS_NOT_IMPLEMENTED;
+      }
+
+      CHECK_NA(INTERNAL, pg_type.WithFieldName(colname).SetSchema(schema->children[row]),
+               error);
+    }
+  } else {
+    SetError(error, "%s%s", "Failed to get table schema: ", PQerrorMessage(conn_));
+    final_status = ADBC_STATUS_IO;
+  }
+  PQclear(result);
+
+  // Disconnect since PostgreSQL connections can be heavy.
+  {
+    AdbcStatusCode status = database_->Disconnect(&conn_, error);
+    if (status != ADBC_STATUS_OK) final_status = status;
+  }

Review Comment:
   We don't connect in this method, so we probably shouldn't disconnect if I'm not mistaken?



##########
c/driver/postgresql/connection.cc:
##########
@@ -47,7 +49,80 @@ AdbcStatusCode PostgresConnection::GetTableSchema(const char* catalog,
                                                   const char* table_name,
                                                   struct ArrowSchema* schema,
                                                   struct AdbcError* error) {
-  return ADBC_STATUS_NOT_IMPLEMENTED;
+  AdbcStatusCode final_status = ADBC_STATUS_OK;
+  struct StringBuilder query = {0};
+  if (StringBuilderInit(&query, /*initial_size=*/256) != 0) return ADBC_STATUS_INTERNAL;
+
+  if (StringBuilderAppend(
+          &query, "%s",
+          "SELECT attname, atttypid "
+          "FROM pg_catalog.pg_class AS cls "
+          "INNER JOIN pg_catalog.pg_attribute AS attr ON cls.oid = attr.attrelid "
+          "INNER JOIN pg_catalog.pg_type AS typ ON attr.atttypid = typ.oid "
+          "WHERE attr.attnum >= 0 AND cls.oid = '") != 0)
+    return ADBC_STATUS_INTERNAL;
+
+  if (db_schema != nullptr) {
+    char* schema = PQescapeIdentifier(conn_, db_schema, strlen(db_schema));

Review Comment:
   Agreed, do you want to file something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org