You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "kou (via GitHub)" <gi...@apache.org> on 2023/10/01 21:46:14 UTC

[GitHub] [arrow] kou opened a new pull request, #37970: GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset()

kou opened a new pull request, #37970:
URL: https://github.com/apache/arrow/pull/37970

   ### Rationale for this change
   
   We can reuse the same StreamDecoder to read multiple streams with this.
   
   ### What changes are included in this PR?
   
   Add StreamDecoder::Reset().
   
   ### Are these changes tested?
   
   Yes.
   
   ### Are there any user-facing changes?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "MrLolthe1st (via GitHub)" <gi...@apache.org>.
MrLolthe1st commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1755192774

   Looks like truth!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "MrLolthe1st (via GitHub)" <gi...@apache.org>.
MrLolthe1st commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1750654635

   You can checkout our temporary workaround here:
   stream_decoder.cc https://github.com/ydb-platform/ydb/commit/f288403e6cb7cc62bb16f2c707296f096a62df29#diff-89894a6bbc78b5f2aacc62338e7cbfbd6a7a841dd14bb44eed25fd5dd151dc3a
   stream_decoder.h https://github.com/ydb-platform/ydb/commit/f288403e6cb7cc62bb16f2c707296f096a62df29#diff-a442528f4921f6781d036840f10ce3faf49990cdbba0a88bffb7c8832592ec39
   There some changes around MessageReader too. Also, dictionary_memo_ is now unique pointer: `dictionary_memo_  = std::make_unique<DictionaryMemo>();`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1790151066

   No objection.
   I'll merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #37970:
URL: https://github.com/apache/arrow/pull/37970#discussion_r1344878886


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -425,6 +425,37 @@ class ARROW_EXPORT StreamDecoder {
   /// \return Status
   Status Consume(std::shared_ptr<Buffer> buffer);
 
+  /// \brief Reset the internal status.
+  ///
+  /// You can reuse this decoder for new stream after calling
+  /// this. For example, you can implement endless decoder with this:

Review Comment:
   OK. I'll remove this example.
   
   See #37429 for a use case of the endless decoder. It reads multiple streams with one decoder instance.
   
   FYI: We can implement the use case by using `next_required_size()` instead but we need to recreate decoder instances manually:
   
   ```cpp
   while (true) {
     auto buffer = get_data(decoder->next_required_size());
     if (!buffer) {
       break;
     }
     decoder->Consume(buffer);
     if (listener->status() == EOS) {
       decoder = create_decoder(listener);
     }
   }
   ```



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -2032,6 +2036,11 @@ Status StreamDecoder::Consume(const uint8_t* data, int64_t size) {
 Status StreamDecoder::Consume(std::shared_ptr<Buffer> buffer) {
   return impl_->Consume(std::move(buffer));
 }
+Status StreamDecoder::Reset() {
+  impl_ =
+      std::make_unique<StreamDecoderImpl>(std::move(impl_->listener()), impl_->options());

Review Comment:
   Oh, sorry. I'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "MrLolthe1st (via GitHub)" <gi...@apache.org>.
MrLolthe1st commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1750594417

   See https://github.com/apache/arrow/issues/37429#issuecomment-1750578183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1782137506

   I'll merge this in a few days if nobody objects it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou merged PR #37970:
URL: https://github.com/apache/arrow/pull/37970


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1751822564

   Why do we need to care about `MessageReader`?
   This implementation re-creates `StreamDecoderImpl` instead of reusing existing `StreamDecoderImpl` (including `MessageReader`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1754367712

   Ah, I understand.
   I've fixed the case by splitting the given data. If `StreamDecoderImpl::Consume()` is finished for each state, the problem isn't happen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #37970:
URL: https://github.com/apache/arrow/pull/37970#discussion_r1344254047


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -425,6 +425,37 @@ class ARROW_EXPORT StreamDecoder {
   /// \return Status
   Status Consume(std::shared_ptr<Buffer> buffer);
 
+  /// \brief Reset the internal status.
+  ///
+  /// You can reuse this decoder for new stream after calling
+  /// this. For example, you can implement endless decoder with this:

Review Comment:
   Honestly, I don't think the endless decoder is very useful (why would I want an endless decoder?). Perhaps we can simply remove it?



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -2032,6 +2036,11 @@ Status StreamDecoder::Consume(const uint8_t* data, int64_t size) {
 Status StreamDecoder::Consume(std::shared_ptr<Buffer> buffer) {
   return impl_->Consume(std::move(buffer));
 }
+Status StreamDecoder::Reset() {
+  impl_ =
+      std::make_unique<StreamDecoderImpl>(std::move(impl_->listener()), impl_->options());

Review Comment:
   Nit: `std::move` on a rvalue is probably a no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #37970: GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset()

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1742212189

   :warning: GitHub issue #37429 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1790875135

   After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit cead3dd3f024a16f5b23dd6a82069a3190967ec4.
   
   There were no benchmark performance regressions. 🎉
   
   The [full Conbench report](https://github.com/apache/arrow/runs/18304554235) has more details. It also includes information about 32 possible false positives for unstable benchmarks that are known to sometimes produce them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() [arrow]

Posted by "MrLolthe1st (via GitHub)" <gi...@apache.org>.
MrLolthe1st commented on PR #37970:
URL: https://github.com/apache/arrow/pull/37970#issuecomment-1752149766

   > Why do we need to care about `MessageReader`? This implementation re-creates `StreamDecoderImpl` instead of reusing existing `StreamDecoderImpl` (including `MessageReader`).
   
   Because when we got EOS in the middle of payload.
   Call stack when we've got EOS looks like that:
   Listener::OnEOS(..)
   StreamDecoderImpl::OnEOS()
   MessageDecoderImpl::OnEOS() <- that is old message decoder, that already is in EOS state. after exiting method above, we will return here.
   ...
   MessageDecoderImpl::Consume(...)
   MessageDecoder::Consume(...)
   StreamDecoderImpl::Consume(...)
   
   So, if we recreate StreamDecoderImpl in Listener::OnEOS(), after exiting Listener::OnEOS we're will
   1. Return to old StreamDecoderImpl, that isn't exists.
   2. Than return to old MessageDocederImpl, that isn't exists. But, if we're somehow save old message decoder, it is in state EOS and will ignore all messages, that left in payload, because of EOS state.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org