You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/12/07 15:20:32 UTC

[GitHub] [pulsar] BewareMyPower edited a comment on issue #8846: [Pulsar Client Cpp] memory leak when receive async

BewareMyPower edited a comment on issue #8846:
URL: https://github.com/apache/pulsar/issues/8846#issuecomment-739970491


   Firstly it's true, not like the producer's send operation, consumer's receive operation has no timeout. So calling `receiveAsync` infinitely without any limit while no messages could be received will make pending receives in memory growing infinitely.
   
   However, you can add the limit for the number of pending receives from a client side:
   
   ```c++
   #include <pulsar/Client.h>
   #include <mutex>
   #include <condition_variable>
   
   using namespace pulsar;
   
   struct PendingReceiveTracker {
       std::mutex mutex_;
       std::condition_variable cond_;
   
       size_t numPendingReceives_ = 0;
       const size_t maxNumPendingReceives_;
   
       PendingReceiveTracker(size_t maxNumPendingReceives) : maxNumPendingReceives_(maxNumPendingReceives) {}
   
       void beforeReceiveAsync() {
           std::unique_lock<std::mutex> lock(mutex_);
           while (numPendingReceives_ >= maxNumPendingReceives_) {
               // TODO: you can add other handler instead of wait here
               cond_.wait(lock);
           }
           numPendingReceives_++;
       }
   
       void completeReceiveAsync() {
           std::unique_lock<std::mutex> lock(mutex_);
           if (numPendingReceives_ == 0) {
               return;
           }
           numPendingReceives_--;
           cond_.notify_all();
       }
   };
   
   int main() {
       Client client("pulsar://localhost:6650");
       Consumer consumer;
       client.subscribe("my-topic", "consumer-1", consumer);  // ignore the returned value check here
   
       PendingReceiveTracker tracker(1000);
       for (int i = 0; i < 100000; i++) {
           // When the pending receives achieve 1000, this method blocks here until any message is received.
           // You can also change the strategy, see the TODO in beforeReceiveAsync()
           tracker.beforeReceiveAsync();
           consumer.receiveAsync([&tracker](Result result, const Message& msg) {
               tracker.completeReceiveAsync();
               if (result == ResultOk) {
                   // TODO: process `msg`
               } else {
                   // TODO: handle error
               }
           });
           // do other things here...
       }
   
       client.close();
   }
   ```
   
   As for a timeout limit, it's a little more complicated that a timer is needed.
   
   Let's go back to the timeout case. Send timeout means some specific messages' associated responses are not received during the timeout. Users can choose to reproduce **the same messages** again if they don't care message duplication.
   
   However, unlike the send timeout, receive timeout is not related to any message. So when you encounter the timeout with N pending receives, what will you do? Clear these pending receives? OK, then you'll add more pending receives again.  Clear and then re-add the pending receives is not much different with make these receives continue pending. The only behavior that make much difference is stop receiving, like triggering an alarm.
   
   A simpler way is just calling `receive()` with a timeout. IMO, `receiveAsync()` doesn't has any more significant benefit than `receive(Message& msg, int timeout)` except waiting for multiple messages at the same time. C++ client doesn't provide something like Java's `CompletableFuture`, so you need to play tricks with the callback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org