You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/08/12 16:42:15 UTC

[GitHub] [pulsar] BewareMyPower edited a comment on issue #7797: Memory Leak in C++Pulsar Reader Interface

BewareMyPower edited a comment on issue #7797:
URL: https://github.com/apache/pulsar/issues/7797#issuecomment-672985677


   #7793 is a bugfix for reader memory leak, it's because `ReaderImpl`'s reference count is 2 before `Reader` destructed. That PR mentioned the problem, but it's only a part.
   
   I'll explain the real reason of memory leak and why the bugfix works here. If you doesn't care the reason, just go to the last chapter: **Safe createReader implementation**.
   
   ## Some implementation knowledge
   
   First, `Reader` holds a shared pointer of `ReaderImpl`:
   ```c++
   class Reader {
       std::shared_ptr<ReaderImpl> impl_;
   };
   ```
   
   Then, the synchronous `createReader` method is passing a `WaitForCallbackValue` object to asynchonous `createReaderAsync`:
   
   ```c++
   Result Client::createReader(/* ... */, Reader& reader) {
       Promise<Result, Reader> promise;
       createReaderAsync((/* ... */, WaitForCallbackValue<Reader>(promise));
       Future<Result, Reader> future = promise.getFuture();
   
       return future.get(reader);
   }
   ```
   
   While a `Promise<Result, Reader>` holds an `InternalState<Result, Reader>`, which holds a `Reader`:
   
   ```c++
   template <typename Result, typename Type>
   struct InternalState {
       /* Other fields... */
       Type value;
   };
   
   template <typename Result, typename Type>
   class Promise {
        std::shared_ptr<InternalState<Result, Type> > state_;
   };
   ```
   
   Now, we know, in `createReader`, the `promise` holds a shared pointer of `InternalState<Result, Reader>` that has a `Reader` field that holds a shared pointer of `ReaderImpl`. **[1]**
   
   ## Why the memory leak occurs
   
   It seems that `createReader` would work fine, because `createProducer` and `subscribe` use similar code to wrap asynchonous `xxxAsync` methods.
   
   But things get different in `ReaderImpl`, it has a field:
   
   ```
   ReaderCallback readerCreatedCallback_;
   ```
   
   `ReaderCallback` is a typedef of `std::function<Result, Reader>`, and it's constructed with `WaitForCallbackValue<Reader>` object in `createReader`. Therefore, a `ReaderImpl` holds a `WaitForCallbackValue<Reader>`, which holds a `Promise<Result, Reader>`. **[2]**
   
   Combine **[1]** and **[2]**, the callback holds a shared `Reader` of `InternalState`, but it's not released until `Promise` of callback released, while the `ReaderImpl` shared the `Promise` that caused it's reference count not being 0 after `createReader` returned and `promise` destructed. Even if `promise` must have been completed before `createReader` returns. The `ReaderImpl` just hold an useless `Promise`.
   
   ## Why the bugfix works
   
   Look back to the bugfix, why it works? Because it changes `Promise` to `Promise&` in `WaitForCallbackValue`, which means there's only one `Promise` instance. No copy of `Promise` happens,  the `InternalState` has only **one** Promise owner. When `promise` destructed after `createReader` returned, the `InternalState`'s reference count became 0 and released.
   
   Tips: `Future` also shared the `InternalState`, it should be changed to weak pointer, but it doesn't matter here.
   
   ## Alternative solution
   
   Change `Promise`& in `WaitForCallbackValue` to `Promise` again, the valgrind's result:
   
   ```
   ==14509==    definitely lost: 160 bytes in 1 blocks
   ==14509==    indirectly lost: 23,054 bytes in 30 blocks
   ==14509==      possibly lost: 1,338 bytes in 9 blocks
   ==14509==    still reachable: 152,755 bytes in 197 blocks
   ==14509==         suppressed: 0 bytes in 0 blocks
   ```
   
   And modify `ReaderImpl::handleConsumerCreated`, clear the `readerCreatedCallback_`:
   
   ```c++
   void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
       auto self = shared_from_this();
       readerCreatedCallback_(result, Reader(self));
       readerImplWeakPtr_ = self;
       readerCreatedCallback_ = [](Result, Reader) {
       };  // Do nothing, just release the older callback, which is a WaitForCallbackValue
   }
   ```
   
   Then the valgrind's result became:
   
   ```
   ==14986==    definitely lost: 0 bytes in 0 blocks
   ==14986==    indirectly lost: 0 bytes in 0 blocks
   ==14986==      possibly lost: 0 bytes in 0 blocks
   ==14986==    still reachable: 12,621 bytes in 145 blocks
   ==14986==         suppressed: 0 bytes in 0 blocks
   ```
   
   Test code:
   
   ```c++
   #include <pulsar/Client.h>
   using namespace pulsar;
   
   int main() {
       Client client("pulsar://localhost:6650");
       ReaderConfiguration readerConfig;
       Reader reader;
       Result result = client.createReader("Foo", MessageId::earliest(), readerConfig, reader);
       if (result != ResultOk) {
           return -1;
       }
       reader.close();
       client.close();
   }
   ```
   
   ## Safe `createReader` implementation
   
   After knowing the problem is the callback,  you can just implement your own safe synchronous `createReader` method using `createReaderAsync`:
   
   ```c++
   #include <atomic>
   #include <chrono>
   #include <thread>
   
   #include <pulsar/Client.h>
   
   namespace pulsar {
   
   inline Result createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
                              const ReaderConfiguration& conf, Reader& reader) {
       Result result;
       std::atomic_int completed{false};
       client.createReaderAsync(topic, startMessageId, conf,
                                [&result, &reader, &completed](Result result_, Reader reader_) {
                                    if (result_ == ResultOk) {
                                        reader = reader_;
                                    }
                                    result = result_;
                                    completed = true;
                                });
       while (!completed) {
           std::this_thread::sleep_for(std::chrono::milliseconds(100));
       }
       return result;
   }
   
   }  // namespace pulsar
   ```
   
   And use `createReader(client, /* ... */, reader)` instead of `client.createReader(/* ... */, reader)`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org