You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "kou (via GitHub)" <gi...@apache.org> on 2023/12/10 15:42:02 UTC

[PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

kou opened a new pull request, #39164:
URL: https://github.com/apache/arrow/pull/39164

   ### Rationale for this change
   
   We need to copy data for metadata message. Because it may be used in subsequent `Consume(data)` calls. We can't assume that the given `data` is still valid in subsequent `Consume(data)`.
   
   We also need to copy buffered `data` because it's used in subsequent `Consume(data)` calls.
   
   ### What changes are included in this PR?
   
   * Add missing copies.
   * Clean up existing buffer copy codes.
   * Change tests to use ephemeral `data` to detect this case.
   * Add `copy_record_batch` option to `CollectListener` to copy decoded record batches.
   
   ### Are these changes tested?
   
   Yes.
   
   ### Are there any user-facing changes?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1428357654


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   Wouldn't this be solved more elegantly by changing `ConsumeMetadataBuffer`?
   
   ```diff
      Status ConsumeMetadataBuffer(const std::shared_ptr<Buffer>& buffer) {
   -    if (buffer->is_cpu()) {
   -      metadata_ = buffer;
   -    } else {
          ARROW_ASSIGN_OR_RAISE(metadata_,
                                Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
   -    }
        return ConsumeMetadata();
      }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1880049708

   After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 6ab7a18fdc4cc3a48c1f40da3b2fedd58f5bfc23.
   
   There was 1 benchmark result with an error:
   
   - Commit Run on `ursa-i9-9960x` at [2024-01-06 23:18:17Z](https://conbench.ursa.dev/compare/runs/eb5e0eae45bc4207a33964b3e0134906...2343914d1b3c464bb5807b6ccd940be8/)
     - [`tpch` (R) with engine=arrow, format=parquet, language=R, memory_map=False, query_id=TPCH-19, scale_factor=10](https://conbench.ursa.dev/benchmark-results/06599f9755ee7cd48000f60ef81577d9)
   
   There was 1 benchmark result indicating a performance regression:
   
   - Commit Run on `ursa-i9-9960x` at [2024-01-06 23:18:17Z](https://conbench.ursa.dev/compare/runs/eb5e0eae45bc4207a33964b3e0134906...2343914d1b3c464bb5807b6ccd940be8/)
     - [`tpch` (R) with engine=arrow, format=native, language=R, memory_map=False, query_id=TPCH-01, scale_factor=1](https://conbench.ursa.dev/compare/benchmarks/06599caada12741b80005f81cc1d5555...06599f27ed2671f380005db7aeb3e94f)
   
   The [full Conbench report](https://github.com/apache/arrow/runs/20236896576) has more details. It also includes information about 10 possible false positives for unstable benchmarks that are known to sometimes produce them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1430791451


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   If we use the change, all buffers are copied because we're using `CPUDevice::memory_manager(pool_)` not `arrow::default_cpu_memory_manager()`.
   If we don't use the change, all buffers aren't copied.
   
   If we use the change with #39270, all buffers aren't copied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "ianmcook (via GitHub)" <gi...@apache.org>.
ianmcook commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1874518012

   @kou It would be very nice to have this merged in time for the 15.0.0 release if possible (code freeze planned 8 January)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1421940563


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -2052,6 +2052,39 @@ Status Listener::OnRecordBatchWithMetadataDecoded(
   return OnRecordBatchDecoded(std::move(record_batch_with_metadata.batch));
 }
 
+namespace {
+Status CopyArrayData(std::shared_ptr<ArrayData> data) {

Review Comment:
   Good catch!
   It will reduce reference count related cost.
   I'll do it after we get a consensus with this copy approach is reasonable. (I hope that we have better approach...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1442633352


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -317,7 +317,12 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
+  explicit CollectListener(bool copy_record_batch = false)

Review Comment:
   I reverted this change add a create new `CopyCollectListener()` only for test because I'm not sure whether this API is good or not.
   We can defer the API change decision by this because this doesn't change public APi. We can reconsider public API later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1430669809


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   > It works but it causes needless copy with Consume(buffer) API.
   
   Wouldn't it be the same number of copies?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou merged PR #39164:
URL: https://github.com/apache/arrow/pull/39164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1428844231


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   It works but it causes needless copy with `Consume(buffer)` API.
   
   If we use `Buffer::ViewOrCopy()` for copying a buffer, we should do this in `Consume(data, size)`:
   
   ```diff
   diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
   index fbcd6f139b..351fa6c6db 100644
   --- a/cpp/src/arrow/ipc/message.cc
   +++ b/cpp/src/arrow/ipc/message.cc
   @@ -632,9 +632,10 @@ class MessageDecoder::MessageDecoderImpl {
                // the current ConsumeData() call is still valid in the
                // next ConsumeData() call. So we need to copy metadata
                // here.
   -            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
   -                                  AllocateBuffer(next_required_size_, pool_));
   -            memcpy(buffer->mutable_data(), data, next_required_size_);
   +            ARROW_ASSIGN_OR_RAISE(
   +                auto buffer,
   +                Buffer::ViewOrCopy(std::make_shared<Buffer>(data, next_required_size_),
   +                                   CPUDevice::memory_manager(pool_)));
                RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
              } break;
              case State::BODY: {
   ```
   
   BTW, we should use `arrow::default_cpu_memory_manager()` for `arrow::default_memory_pool()`... I'll open a issue for it later.
   
   ```diff
   diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
   index fbcd6f139b..a0005a0e59 100644
   --- a/cpp/src/arrow/ipc/message.cc
   +++ b/cpp/src/arrow/ipc/message.cc
   @@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
                                  MemoryPool* pool, bool skip_body)
          : listener_(std::move(listener)),
            pool_(pool),
   +        memory_manager_(pool_ == default_memory_pool() ? default_cpu_memory_manager() : CPUDevice::memory_manager(pool_)),
            state_(initial_state),
            next_required_size_(initial_next_required_size),
            chunks_(),
   @@ -823,7 +824,7 @@ class MessageDecoder::MessageDecoderImpl {
          metadata_ = buffer;
        } else {
          ARROW_ASSIGN_OR_RAISE(metadata_,
   -                            Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
   +                            Buffer::ViewOrCopy(buffer, memory_manager_));
        }
        return ConsumeMetadata();
      }
   @@ -836,14 +837,14 @@ class MessageDecoder::MessageDecoderImpl {
            } else {
              ARROW_ASSIGN_OR_RAISE(
                  metadata_,
   -              Buffer::ViewOrCopy(chunks_[0], CPUDevice::memory_manager(pool_)));
   +              Buffer::ViewOrCopy(chunks_[0], memory_manager_));
            }
            chunks_.erase(chunks_.begin());
          } else {
            metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
            if (!chunks_[0]->is_cpu()) {
              ARROW_ASSIGN_OR_RAISE(
   -              metadata_, Buffer::ViewOrCopy(metadata_, CPUDevice::memory_manager(pool_)));
   +            metadata_, Buffer::ViewOrCopy(metadata_, memory_manager_));
            }
            chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
          }
   @@ -912,7 +913,7 @@ class MessageDecoder::MessageDecoderImpl {
          return util::SafeLoadAs<int32_t>(buffer->data());
        } else {
          ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
   -                            Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
   +                            Buffer::ViewOrCopy(buffer, memory_manager_));
          return util::SafeLoadAs<int32_t>(cpu_buffer->data());
        }
      }
   @@ -925,7 +926,7 @@ class MessageDecoder::MessageDecoderImpl {
        for (auto& chunk : chunks_) {
          if (!chunk->is_cpu()) {
            ARROW_ASSIGN_OR_RAISE(
   -            chunk, Buffer::ViewOrCopy(chunk, CPUDevice::memory_manager(pool_)));
   +            chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
          }
          auto data = chunk->data();
          auto data_size = chunk->size();
   @@ -951,6 +952,7 @@ class MessageDecoder::MessageDecoderImpl {
    
      std::shared_ptr<MessageDecoderListener> listener_;
      MemoryPool* pool_;
   +  std::shared_ptr<MemoryManager> memory_manager_;
      State state_;
      int64_t next_required_size_;
      std::vector<std::shared_ptr<Buffer>> chunks_;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1421772936


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -317,7 +317,12 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
+  CollectListener(bool copy_record_batch = false)

Review Comment:
   I'm not sure this is a good API...
   Should we create a new `CopyCollectListener` or something instead?



##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -872,10 +889,10 @@ class MessageDecoder::MessageDecoderImpl {
       buffered_size_ -= used_size;
       return Status::OK();
     } else {
-      ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(next_required_size_, pool_));
+      ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> body,
+                            AllocateBuffer(next_required_size_, pool_));
       RETURN_NOT_OK(ConsumeDataChunks(next_required_size_, body->mutable_data()));
-      std::shared_ptr<Buffer> shared_body(body.release());
-      return ConsumeBody(&shared_body);
+      return ConsumeBody(&body);

Review Comment:
   This is just a clean up. This is not related to this fix.



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -2052,6 +2052,39 @@ Status Listener::OnRecordBatchWithMetadataDecoded(
   return OnRecordBatchDecoded(std::move(record_batch_with_metadata.batch));
 }
 
+namespace {
+Status CopyArrayData(std::shared_ptr<ArrayData> data) {

Review Comment:
   Maybe, we should not do this...



##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -830,8 +849,7 @@ class MessageDecoder::MessageDecoderImpl {
       }
       buffered_size_ -= next_required_size_;
     } else {
-      ARROW_ASSIGN_OR_RAISE(auto metadata, AllocateBuffer(next_required_size_, pool_));
-      metadata_ = std::shared_ptr<Buffer>(metadata.release());
+      ARROW_ASSIGN_OR_RAISE(metadata_, AllocateBuffer(next_required_size_, pool_));

Review Comment:
   This is just a clean up. This is not related to this fix.



##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -846,9 +864,8 @@ class MessageDecoder::MessageDecoderImpl {
     next_required_size_ = skip_body_ ? 0 : body_length;
     RETURN_NOT_OK(listener_->OnBody());
     if (next_required_size_ == 0) {
-      ARROW_ASSIGN_OR_RAISE(auto body, AllocateBuffer(0, pool_));
-      std::shared_ptr<Buffer> shared_body(body.release());
-      return ConsumeBody(&shared_body);
+      auto body = std::make_shared<Buffer>(nullptr, 0);
+      return ConsumeBody(&body);

Review Comment:
   This is just a clean up. This is not related to this fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1878334908

   I've replied:
   * https://github.com/apache/arrow/pull/39164#discussion_r1442630986
   * https://github.com/apache/arrow/pull/39164#discussion_r1442633352
   
   I've reverted `CollectListener` API change. So we don't need to think about public API change in this PR.
   If `std::make_shared<Buffer>()`+`Buffer::Copy()` is better, please change it directly and merge this for 15.0.0. I don't have strong opinion for the approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1426718017


##########
cpp/src/arrow/ipc/read_write_test.cc:
##########
@@ -1334,7 +1334,7 @@ struct StreamDecoderWriterHelper : public StreamWriterHelper {
   Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
                      ReadStats* out_stats = nullptr,
                      MetadataVector* out_metadata_list = nullptr) override {
-    auto listener = std::make_shared<CollectListener>();
+    auto listener = std::make_shared<CollectListener>(true);

Review Comment:
   Also, why do we set it to true here? Can you add a comment?



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -317,7 +317,12 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
+  explicit CollectListener(bool copy_record_batch = false)

Review Comment:
   I don't understand why this is needed. Can you elaborate?



##########
cpp/src/arrow/ipc/read_write_test.cc:
##########
@@ -1334,7 +1334,7 @@ struct StreamDecoderWriterHelper : public StreamWriterHelper {
   Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* out_batches,
                      ReadStats* out_stats = nullptr,
                      MetadataVector* out_metadata_list = nullptr) override {
-    auto listener = std::make_shared<CollectListener>();
+    auto listener = std::make_shared<CollectListener>(true);

Review Comment:
   Please mention the parameter name
   ```suggestion
       auto listener = std::make_shared<CollectListener>(/*xxx=*/ true);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1433391890


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -317,7 +317,12 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
+  explicit CollectListener(bool copy_record_batch = false)

Review Comment:
   @pitrou Does this elaborate enough?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1877706289

   Sorry. I forgot to reply this. I'll do it today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "mapleFU (via GitHub)" <gi...@apache.org>.
mapleFU commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1421896253


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -2052,6 +2052,39 @@ Status Listener::OnRecordBatchWithMetadataDecoded(
   return OnRecordBatchDecoded(std::move(record_batch_with_metadata.batch));
 }
 
+namespace {
+Status CopyArrayData(std::shared_ptr<ArrayData> data) {

Review Comment:
   Would `ArrayData*` be better in this place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1434058036


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   Would it work if `ConsumeMetadataBuffer` took a memory manager parameter?
   
   (Keep in mind that I'm not familiar with this code and don't understand how it's used)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1434140129


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   > If we use the change, all buffers are copied because we're using `CPUDevice::memory_manager(pool_)` not `arrow::default_cpu_memory_manager()`.
   
   This is not supposed to trigger a copy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1879587318

   I'll merge this for 15.0.0.
   If there is a problem in this change, I'll work on it in a follow-up PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "felipecrv (via GitHub)" <gi...@apache.org>.
felipecrv commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1877592906

   > @kou It would be very nice to have this merged in time for the 15.0.0 release if possible (code freeze planned 8 January)
   
   To be clear: I don't oppose the merge at all. I just don't understand what's going on in this class to judge the solution. cc @ianmcook @pitrou 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1442630986


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   Sorry. I was wrong.
   But we can't use `Buffer::ViewOrCopy()` for this because `Buffer::ViewOrCopy()` doesn't copy for this case. It returns a view for this case. So we need to copy the given data by `AllocateBuffer()`+`memcpy()` or `std::make_shared<Buffer>()`+`Buffer::Copy()` explicitly. I think the former is better because the latter needs to create 2 `Buffer`s but the former needs to create only 1 `Buffer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1427550312


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -317,7 +317,12 @@ class ARROW_EXPORT Listener {
 /// \since 0.17.0
 class ARROW_EXPORT CollectListener : public Listener {
  public:
-  CollectListener() : schema_(), filtered_schema_(), record_batches_(), metadatas_() {}
+  explicit CollectListener(bool copy_record_batch = false)

Review Comment:
   Oh, sorry. My comment was outdated: https://github.com/apache/arrow/pull/39164#discussion_r1421772936
   
   But it doesn't elaborate enough...
   
   `StreamDecoder::Consume(const uint8_t* data, int64_t size)` (not `Consume(Buffer* buffer)`) assumes that the given `data` is alive while `Consume()` is calling. (The `data` may be freed after `Consume()` call is finished.)
   So decoded record batches that are passed to `Listener` are valid while `Consume()` is calling. But `CollectListener` keeps decoded record batches after `Consume()` is finished. If we want to make decoded record batches are valid after `Consume()` call, we need to copy these record batches.
   
   This is only happen with `Consume(const uint8_t* data, int64_t size)`. This is not happen with `Consume(Buffer* buffer)` because it refers the given `buffer`. So I want to avoid copying all decoded record batches. This is why I added `copy_record_batch` option here.
   
   I hope that this explains why...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1854938008

   @pitrou @felipecrv Do you have any opinion for this approach?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1429759390


##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
             RETURN_NOT_OK(ConsumeMetadataLengthData(data, next_required_size_));
             break;
           case State::METADATA: {
-            auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+            // We need to copy metadata because it's used in
+            // ConsumeBody(). ConsumeBody() may be called from another
+            // ConsumeData(). We can't assume that the given data for
+            // the current ConsumeData() call is still valid in the
+            // next ConsumeData() call. So we need to copy metadata
+            // here.
+            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+                                  AllocateBuffer(next_required_size_, pool_));
+            memcpy(buffer->mutable_data(), data, next_required_size_);

Review Comment:
   #39270 for memory manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data) [arrow]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #39164:
URL: https://github.com/apache/arrow/pull/39164#issuecomment-1858842600

   :warning: GitHub issue #39163 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org