You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/05/04 11:58:29 UTC

[GitHub] [arrow-adbc] lidavidm commented on a diff in pull request #636: refactor(c/driver/postgresql): Factor out COPY reader and test it independently

lidavidm commented on code in PR #636:
URL: https://github.com/apache/arrow-adbc/pull/636#discussion_r1184899648


##########
c/driver/postgresql/statement.cc:
##########
@@ -797,9 +664,33 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,
       PQclear(result);
       return ADBC_STATUS_IO;
     }
-    AdbcStatusCode status = InferSchema(*type_resolver_, result, &reader_.schema_, error);
+
+    // Resolve the information from the PGresult into a PostgresType
+    PostgresType root_type;
+    AdbcStatusCode status =
+        ResolvePostgresType(*type_resolver_, result, &root_type, error);
     PQclear(result);
     if (status != ADBC_STATUS_OK) return status;
+
+    // Initialize the copy reader and infer the output schema (i.e., error for
+    // unsupported types before issuing the COPY query)
+    reader_.copy_reader_.reset(new PostgresCopyStreamReader());

Review Comment:
   Use make_unique?



##########
c/driver/postgresql/postgres_util.h:
##########
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <string.h>

Review Comment:
   string.h -> cstring?



##########
c/driver/postgresql/statement.h:
##########
@@ -35,19 +37,24 @@ class PostgresStatement;
 /// \brief An ArrowArrayStream that reads tuples from a PGresult.
 class TupleReader final {
  public:
-  TupleReader(PGconn* conn) : conn_(conn), result_(nullptr), pgbuf_(nullptr) {
-    std::memset(&schema_, 0, sizeof(schema_));
+  TupleReader(PGconn* conn)
+      : conn_(conn),
+        result_(nullptr),
+        pgbuf_(nullptr),
+        copy_reader_(new PostgresCopyStreamReader()) {

Review Comment:
   Use make_unique over new?



##########
c/driver/postgresql/statement.h:
##########
@@ -35,19 +37,24 @@ class PostgresStatement;
 /// \brief An ArrowArrayStream that reads tuples from a PGresult.
 class TupleReader final {
  public:
-  TupleReader(PGconn* conn) : conn_(conn), result_(nullptr), pgbuf_(nullptr) {
-    std::memset(&schema_, 0, sizeof(schema_));
+  TupleReader(PGconn* conn)
+      : conn_(conn),
+        result_(nullptr),
+        pgbuf_(nullptr),
+        copy_reader_(new PostgresCopyStreamReader()) {

Review Comment:
   Or just leave it as nullptr if it's re-initialized again later.



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "postgres_copy_reader.h"
+
+using adbcpq::PostgresCopyStreamReader;

Review Comment:
   You could just put the tests in the adbcpq namespace.



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "postgres_copy_reader.h"
+
+using adbcpq::PostgresCopyStreamReader;
+using adbcpq::PostgresType;
+using adbcpq::PostgresTypeId;
+
+class PostgresCopyStreamTester {
+ public:
+  ArrowErrorCode Init(const PostgresType& root_type, ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(reader_.Init(root_type));
+    NANOARROW_RETURN_NOT_OK(reader_.InferOutputSchema(error));
+    NANOARROW_RETURN_NOT_OK(reader_.InitFieldReaders(error));
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode ReadAll(ArrowBufferView* data, ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(reader_.ReadHeader(data, error));
+
+    int result;
+    do {
+      result = reader_.ReadRecord(data, error);
+    } while (result == NANOARROW_OK);
+
+    return result;
+  }
+
+  void GetSchema(ArrowSchema* out) { reader_.GetSchema(out); }
+
+  ArrowErrorCode GetArray(ArrowArray* out, ArrowError* error = nullptr) {
+    return reader_.GetArray(out, error);
+  }
+
+ private:
+  PostgresCopyStreamReader reader_;
+};
+
+// COPY (SELECT CAST("col" AS BOOLEAN) AS "col" FROM (  VALUES (TRUE), (FALSE), (NULL)) AS
+// drvd("col")) TO STDOUT;
+static uint8_t kTestPgCopyBoolean[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x01,
+    0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadBoolean) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyBoolean;
+  data.size_bytes = sizeof(kTestPgCopyBoolean);
+
+  auto col_type = PostgresType(PostgresTypeId::kBool);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+
+  // Apparently the output above contains an extra 0xff 0xff at the end

Review Comment:
   Gonna guess it's this:
   
   > The file trailer consists of a 16-bit integer word containing -1. This is easily distinguished from a tuple's field-count word.
   
   https://www.postgresql.org/docs/current/sql-copy.html



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -0,0 +1,520 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "postgres_copy_reader.h"
+
+using adbcpq::PostgresCopyStreamReader;
+using adbcpq::PostgresType;
+using adbcpq::PostgresTypeId;
+
+class PostgresCopyStreamTester {
+ public:
+  ArrowErrorCode Init(const PostgresType& root_type, ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(reader_.Init(root_type));
+    NANOARROW_RETURN_NOT_OK(reader_.InferOutputSchema(error));
+    NANOARROW_RETURN_NOT_OK(reader_.InitFieldReaders(error));
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode ReadAll(ArrowBufferView* data, ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(reader_.ReadHeader(data, error));
+
+    int result;
+    do {
+      result = reader_.ReadRecord(data, error);
+    } while (result == NANOARROW_OK);
+
+    return result;
+  }
+
+  void GetSchema(ArrowSchema* out) { reader_.GetSchema(out); }
+
+  ArrowErrorCode GetArray(ArrowArray* out, ArrowError* error = nullptr) {
+    return reader_.GetArray(out, error);
+  }
+
+ private:
+  PostgresCopyStreamReader reader_;
+};
+
+// COPY (SELECT CAST("col" AS BOOLEAN) AS "col" FROM (  VALUES (TRUE), (FALSE), (NULL)) AS
+// drvd("col")) TO STDOUT;
+static uint8_t kTestPgCopyBoolean[] = {
+    0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00,
+    0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x01,
+    0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff};
+
+TEST(PostgresCopyUtilsTest, PostgresCopyReadBoolean) {
+  ArrowBufferView data;
+  data.data.as_uint8 = kTestPgCopyBoolean;
+  data.size_bytes = sizeof(kTestPgCopyBoolean);
+
+  auto col_type = PostgresType(PostgresTypeId::kBool);
+  PostgresType input_type(PostgresTypeId::kRecord);
+  input_type.AppendChild("col", col_type);
+
+  PostgresCopyStreamTester tester;
+  ASSERT_EQ(tester.Init(input_type), NANOARROW_OK);
+  ASSERT_EQ(tester.ReadAll(&data), ENODATA);
+
+  // Apparently the output above contains an extra 0xff 0xff at the end
+  ASSERT_EQ(data.data.as_uint8 - kTestPgCopyBoolean, sizeof(kTestPgCopyBoolean));
+  ASSERT_EQ(data.size_bytes, 0);
+
+  struct ArrowArray array;
+  ASSERT_EQ(tester.GetArray(&array), NANOARROW_OK);
+  ASSERT_EQ(array.length, 3);
+  ASSERT_EQ(array.n_children, 1);
+
+  const uint8_t* validity =
+      reinterpret_cast<const uint8_t*>(array.children[0]->buffers[0]);
+  const uint8_t* data_buffer =
+      reinterpret_cast<const uint8_t*>(array.children[0]->buffers[1]);
+  ASSERT_NE(validity, nullptr);
+  ASSERT_NE(data_buffer, nullptr);
+
+  ASSERT_TRUE(ArrowBitGet(validity, 0));
+  ASSERT_TRUE(ArrowBitGet(validity, 1));
+  ASSERT_FALSE(ArrowBitGet(validity, 2));
+
+  ASSERT_TRUE(ArrowBitGet(data_buffer, 0));
+  ASSERT_FALSE(ArrowBitGet(data_buffer, 1));
+  ASSERT_FALSE(ArrowBitGet(data_buffer, 2));
+
+  array.release(&array);

Review Comment:
   nit, but it might be nice to use one of the RAII helpers to avoid leaking this if a test fails



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org