You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "WillAyd (via GitHub)" <gi...@apache.org> on 2023/09/26 20:51:26 UTC

[GitHub] [arrow-adbc] WillAyd opened a new pull request, #1110: feat(c/driver/postgresql): Inital COPY Writer design

WillAyd opened a new pull request, #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110

   This is a pre-cursor to #1093 ; figured it would be easier to work piece-wise rather than all at once.
   
   This does not try to actually connect the statement.cc code to use this, but just gets the test case / general structure set up


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339075802


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));

Review Comment:
   Have to think more about this. I don't think you need 2 passes though? I think this could be calculated up front



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339085863


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances
+// the cursor of buffer and reduces it by sizeof(T)
+template <typename T>
+inline void WriteUnsafe(ArrowBuffer* buffer, T in) {
+  const T value = SwapNetworkToHost(in);
+  memcpy(buffer->data, &value, sizeof(T));
+  buffer->data += sizeof(T);
+  buffer->size_bytes += sizeof(T);
+}
+
+template <>
+inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
+  buffer->data[0] = in;
+  buffer->data += sizeof(int8_t);
+  buffer->size_bytes += sizeof(int8_t);
+}

Review Comment:
   The specializations exist because of the  unsigned argument requirement for`SwapNetworkToHost` requirement, although that makes me realize these are incorrect as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339087537


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {

Review Comment:
   Sounds good. I think its just kind of confusing that we increment the buffer in the `WriteUnchecked` calls but also mix in index access here. May be the best we can do to start



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] lidavidm commented on pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#issuecomment-1739695620

   Yeah, the CI failures are https://github.com/apache/arrow-adbc/issues/1088


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#issuecomment-1738281088

   I don't think the CI failures are related. Happy to have this merged now and work on more writers in follow ups if you'd like


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] lidavidm merged pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm merged PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1337776251


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {

Review Comment:
   The `Write` method calls accept an `index` argument, which is a little different from the Reader setup. Instead of accessing by index, the Readers always call `ArrowBufferAppend` on the array they are building. 
   
   I think we could still do that here, it's just a little bit more complicated by the fact that there is no generator `ArrowBufferGetNext` or similar, so I figured just using index access was easier to start. Could be something larger I am overlooking



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

Review Comment:
   Not sure if this is necessary, just copied from the reader design



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -96,6 +127,32 @@ TEST(PostgresCopyUtilsTest, PostgresCopyReadBoolean) {
   ASSERT_FALSE(ArrowBitGet(data_buffer, 2));
 }
 
+TEST(PostgresCopyUtilsTest, PostgresCopyWriteBoolean) {
+  adbc_validation::Handle<struct ArrowSchema> schema;
+  adbc_validation::Handle<struct ArrowArray> array;
+  struct ArrowError na_error;
+  ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_BOOL}}),
+            ADBC_STATUS_OK);
+  ASSERT_EQ(adbc_validation::MakeBatch<bool>(&schema.value, &array.value, &na_error,
+                                             {true, false, std::nullopt}),
+            ADBC_STATUS_OK);
+
+  PostgresCopyStreamWriteTester tester;
+  ASSERT_EQ(tester.Init(&schema.value, &array.value), NANOARROW_OK);
+
+  struct ArrowBuffer buffer;
+  ArrowBufferInit(&buffer);
+  ArrowBufferReserve(&buffer, sizeof(kTestPgCopyBoolean));
+  ASSERT_EQ(tester.WriteAll(&buffer, nullptr), ENODATA);
+
+  // The last 4 bytes of a message can be transmitted via PQputCopyData
+  // so no need to test those bytes from the Writer
+  for (size_t i = 0; i < sizeof(kTestPgCopyBoolean) - 4; i++) {

Review Comment:
   Ultimately when we implement this in statement.cc I imagine we will build the buffer (maybe even in chunks) and send that via `PQputCopyData` . When all is said and done we would then do a `PQputCopyEnd` to send the last 4 bytes. Maybe we should make the end message a constant in the tests so it is clear what is part of the "data" versus the sentinel signaling the end of the buffer



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances

Review Comment:
   I put all of this code into `postgres_copy_reader.h` because it re-uses a lot of the same patterns and constants. Maybe we should rename this `postgres_copy_io.h`?



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
+      root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};

Review Comment:
   I'm a bit iffy on C++ constructs, but I think this is the easiest way to declare an pointer that owns data as a class member with C++11 compat. Apologies if I'm missing something easier



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));

Review Comment:
   I think we can move towards `ArrowBufferAppendUnsafe` if ensure a proper buffer size up front. I think in the current protocol you need 19 bytes for the header, 2 bytes for the number of columns in each row, 4 bytes for each record to indicate the record length, n bits for every non-null record to contain its actual bytes and finally 4 bytes for the end message. 
   
   Something to investigate futher



##########
c/driver/postgresql/postgres_copy_reader_test.cc:
##########
@@ -52,6 +55,34 @@ class PostgresCopyStreamTester {
   PostgresCopyStreamReader reader_;
 };
 
+class PostgresCopyStreamWriteTester {
+ public:
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array,
+                      ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(writer_.Init(schema, array));
+    NANOARROW_RETURN_NOT_OK(writer_.InitFieldWriters(error));
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteAll(struct ArrowBuffer* buffer, ArrowError* error = nullptr) {
+    NANOARROW_RETURN_NOT_OK(writer_.WriteHeader(buffer, error));
+
+    int result;
+    do {
+      result = writer_.WriteRecord(buffer, error);
+    } while (result == NANOARROW_OK);
+
+    // TODO: don't think we should do this here; the reader equivalent does

Review Comment:
   AFAICT the reader implementation just moves through the message buffer. I mirrored that as well for the writer, but that means that trying to read the buffer after the fact requires knowing how many bytes were traversed and moving back there. Probably a better way to do this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] lidavidm commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "lidavidm (via GitHub)" <gi...@apache.org>.
lidavidm commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339058351


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances

Review Comment:
   We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {

Review Comment:
   I think indices are fine, reading data is different than writing it



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
+      root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};

Review Comment:
   I believe `Handle<ArrowArrayView>` should work. I should really find the time to go write that adbc++ library.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances
+// the cursor of buffer and reduces it by sizeof(T)
+template <typename T>
+inline void WriteUnsafe(ArrowBuffer* buffer, T in) {
+  const T value = SwapNetworkToHost(in);
+  memcpy(buffer->data, &value, sizeof(T));
+  buffer->data += sizeof(T);
+  buffer->size_bytes += sizeof(T);
+}
+
+template <>
+inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
+  buffer->data[0] = in;
+  buffer->data += sizeof(int8_t);
+  buffer->size_bytes += sizeof(int8_t);
+}

Review Comment:
   FWIW, I'm not sure this is necessary. Compilers understand memcpy, and I would guess that they optimize the generic above to the same as these specializations.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

Review Comment:
   Probably not? That was to let us build partial results fitting roughly within some bound.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));

Review Comment:
   ah, so one pass to figure out the buffer size, then another pass to actually copy data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-adbc] WillAyd commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

Posted by "WillAyd (via GitHub)" <gi...@apache.org>.
WillAyd commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339089572


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
+      root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};

Review Comment:
   Cool that is a great idea. I think we would have to refactor the Handle to move from statement.cc to postgres_util.h so will tackle in another PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org