You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/09/27 18:50:10 UTC

[GitHub] [arrow-adbc] lidavidm commented on a diff in pull request #1110: feat(c/driver/postgresql): Inital COPY Writer design

lidavidm commented on code in PR #1110:
URL: https://github.com/apache/arrow-adbc/pull/1110#discussion_r1339058351


##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances

Review Comment:
   We could also factor out the headers into shared/nonshared parts at some point, I don't think it's a big deal either way.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {

Review Comment:
   I think indices are fine, reading data is different than writing it



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));
+
+    const uint32_t flag_fields = 0;
+    ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));
+
+    const uint32_t extension_bytes = 0;
+    ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));
+
+    const int64_t header_bytes =
+        sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
+    buffer->data += header_bytes;
+    array_size_approx_bytes_ += header_bytes;
+
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
+    const uint8_t* start = buffer->data;
+    NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
+    records_written_++;
+    array_size_approx_bytes_ += buffer->data - start;
+    return NANOARROW_OK;
+  }
+
+  ArrowErrorCode InitFieldWriters(ArrowError* error) {
+    if (schema_->release == nullptr) {
+      return EINVAL;
+    }
+
+    for (int64_t i = 0; i < schema_->n_children; i++) {
+      struct ArrowSchemaView schema_view;
+      if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
+          NANOARROW_OK) {
+        return ADBC_STATUS_INTERNAL;
+      }
+      const ArrowType arrow_type = schema_view.type;
+      PostgresCopyFieldWriter* child_writer;
+      NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
+      root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  PostgresCopyFieldTupleWriter root_writer_;
+  struct ArrowSchema* schema_;
+  std::unique_ptr<struct ArrowArrayView> array_view_{new struct ArrowArrayView};

Review Comment:
   I believe `Handle<ArrowArrayView>` should work. I should really find the time to go write that adbc++ library.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -117,6 +117,53 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
   return NANOARROW_OK;
 }
 
+// Write a value to a buffer without checking the buffer size. Advances
+// the cursor of buffer and reduces it by sizeof(T)
+template <typename T>
+inline void WriteUnsafe(ArrowBuffer* buffer, T in) {
+  const T value = SwapNetworkToHost(in);
+  memcpy(buffer->data, &value, sizeof(T));
+  buffer->data += sizeof(T);
+  buffer->size_bytes += sizeof(T);
+}
+
+template <>
+inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
+  buffer->data[0] = in;
+  buffer->data += sizeof(int8_t);
+  buffer->size_bytes += sizeof(int8_t);
+}

Review Comment:
   FWIW, I'm not sure this is necessary. Compilers understand memcpy, and I would guess that they optimize the generic above to the same as these specializations.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }

Review Comment:
   Probably not? That was to let us build partial results fitting roughly within some bound.



##########
c/driver/postgresql/postgres_copy_reader.h:
##########
@@ -1058,4 +1105,143 @@ class PostgresCopyStreamReader {
   int64_t array_size_approx_bytes_;
 };
 
+class PostgresCopyFieldWriter {
+ public:
+  void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };
+
+  virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
+    return ENOTSUP;
+  }
+
+ protected:
+  struct ArrowArrayView* array_view_;
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
+ public:
+  void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
+    int64_t child_i = static_cast<int64_t>(children_.size());
+    children_.push_back(std::move(child));
+    children_[child_i]->Init(array_view_->children[child_i]);
+  }
+
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    if (index >= array_view_->length) {
+      return ENODATA;
+    }
+
+    const int16_t n_fields = children_.size();
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));
+
+    for (int16_t i = 0; i < n_fields; i++) {
+      children_[i]->Write(buffer, index, error);
+    }
+
+    return NANOARROW_OK;
+  }
+
+ private:
+  std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
+};
+
+class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
+ public:
+  ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
+    const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
+    const int32_t field_size_bytes = is_null ? -1 : 1;
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
+    if (is_null) {
+      return ADBC_STATUS_OK;
+    }
+
+    const int8_t value =
+        static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
+    NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));
+
+    return ADBC_STATUS_OK;
+  }
+};
+
+static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
+                                                 PostgresCopyFieldWriter** out,
+                                                 ArrowError* error) {
+  switch (arrow_type) {
+    case NANOARROW_TYPE_BOOL:
+      *out = new PostgresCopyBooleanFieldWriter();
+      return NANOARROW_OK;
+    default:
+      return EINVAL;
+  }
+  return NANOARROW_OK;
+}
+
+class PostgresCopyStreamWriter {
+ public:
+  ~PostgresCopyStreamWriter() { ArrowArrayViewReset(array_view_.get()); }
+
+  ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
+    schema_ = schema;
+    NANOARROW_RETURN_NOT_OK(
+        ArrowArrayViewInitFromSchema(array_view_.get(), schema, nullptr));
+    NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view_.get(), array, nullptr));
+    root_writer_.Init(array_view_.get());
+    return NANOARROW_OK;
+  }
+
+  int64_t array_size_approx_bytes() const { return array_size_approx_bytes_; }
+
+  ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
+    ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));

Review Comment:
   ah, so one pass to figure out the buffer size, then another pass to actually copy data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org