You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/12 03:24:24 UTC

[pulsar] branch branch-2.6 updated: cpp: fix reference leak when reader create (#7793)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 255400b  cpp: fix reference leak when reader create (#7793)
255400b is described below

commit 255400b16de130e63b7a0a08edf8e0dd6389700d
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Aug 12 06:50:05 2020 +0800

    cpp: fix reference leak when reader create (#7793)
    
    ### Motivation
    User reports a valgrind error for `client::createReader` method:
    
    ```
    ==23308== 284,826 (160 direct, 284,666 indirect) bytes in 1 blocks are definitely lost in loss record 113 of 113
    ==23308==    at 0x4C2A593: operator new(unsigned long) (vg_replace_malloc.c:344)
    ==23308==    by 0x5303B4A: allocate (new_allocator.h:104)
    ==23308==    by 0x5303B4A: allocate (alloc_traits.h:351)
    ==23308==    by 0x5303B4A: __shared_count<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:499)
    ==23308==    by 0x5303B4A: __shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:957)
    ==23308==    by 0x5303B4A: shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:316)
    ==23308==    by 0x5303B4A: allocate_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:598)
    ==23308==    by 0x5303B4A: make_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader> > (shared_ptr.h:614)
    ==23308==    by 0x5303B4A: Promise (Future.h:91)
    ==23308==    by 0x5303B4A: pulsar::Client::createReader(std::string const&, pulsar::MessageId const&, pulsar::ReaderConfiguration const&, pulsar::Reader&) (Client.cc:142)
    ==23308==    by 0x401DDB: main (pulsarReader.cpp:92)
    ==23308==
    ```
    It seems the `ReaderImpl` has been tracked twice when call WaitForCallbackValue. this PR is to fix the issue.
    
    ### Modifications
    
    - fix WaitForCallbackValue which is changed in PR #3484.
    - add test for the reference issue.
    
    ### Verifying this change
    ut passed.
    valgrind found no issue:
    ```
    ==14758== LEAK SUMMARY:
    ==14758==    definitely lost: 0 bytes in 0 blocks
    ==14758==    indirectly lost: 0 bytes in 0 blocks
    ==14758==      possibly lost: 0 bytes in 0 blocks
    ==14758==    still reachable: 12,621 bytes in 145 blocks
    ==14758==         suppressed: 0 bytes in 0 blocks
    ==14758==
    ==14758== For lists of detected and suppressed errors, rerun with: -s
    ==14758== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
    ```
    
    (cherry picked from commit 0e67fc598d3f22f14ca51276ebb75c57b7a159af)
---
 pulsar-client-cpp/lib/ReaderImpl.cc   |  6 ++++-
 pulsar-client-cpp/lib/ReaderImpl.h    |  3 +++
 pulsar-client-cpp/lib/Utils.h         |  4 +--
 pulsar-client-cpp/tests/ReaderTest.cc | 46 +++++++++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/ReaderTest.h  | 32 ++++++++++++++++++++++++
 5 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index ad76493..4bd091a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -63,7 +63,9 @@ void ReaderImpl::start(const MessageId& startMessageId) {
 const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
 
 void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
-    readerCreatedCallback_(result, Reader(shared_from_this()));
+    auto self = shared_from_this();
+    readerCreatedCallback_(result, Reader(self));
+    readerImplWeakPtr_ = self;
 }
 
 ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
@@ -111,4 +113,6 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
     consumer_->seekAsync(timestamp, callback);
 }
 
+ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index ca41510..4069247 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -52,6 +52,8 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
     void seekAsync(const MessageId& msgId, ResultCallback callback);
     void seekAsync(uint64_t timestamp, ResultCallback callback);
 
+    ReaderImplWeakPtr getReaderImplWeakPtr();
+
    private:
     void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);
 
@@ -65,6 +67,7 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
     ConsumerImplPtr consumer_;
     ReaderCallback readerCreatedCallback_;
     ReaderListener readerListener_;
+    ReaderImplWeakPtr readerImplWeakPtr_;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index cd2fb6d..fd50e97 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -38,9 +38,9 @@ struct WaitForCallback {
 
 template <typename T>
 struct WaitForCallbackValue {
-    Promise<Result, T> m_promise;
+    Promise<Result, T>& m_promise;
 
-    WaitForCallbackValue(Promise<Result, T> promise) : m_promise(promise) {}
+    WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
 
     void operator()(Result result, const T& value) {
         if (result == ResultOk) {
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index a91af57..7793311 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -17,6 +17,8 @@
  * under the License.
  */
 #include <pulsar/Client.h>
+#include <pulsar/Reader.h>
+#include "ReaderTest.h"
 
 #include <gtest/gtest.h>
 
@@ -416,3 +418,47 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
     reader.close();
     client.close();
 }
+
+TEST(ReaderTest, testReferenceLeak) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testReferenceLeak";
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    for (int i = 0; i < 10; i++) {
+        std::string content = "my-message-" + std::to_string(i);
+        Message msg = MessageBuilder().setContent(content).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    ReaderConfiguration readerConf;
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
+    ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
+
+    LOG_INFO("1 consumer use count " << consumerPtr.use_count());
+    LOG_INFO("1 reader use count " << readerPtr.use_count());
+
+    for (int i = 0; i < 10; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, reader.readNext(msg));
+
+        std::string content = msg.getDataAsString();
+        std::string expected = "my-message-" + std::to_string(i);
+        ASSERT_EQ(expected, content);
+    }
+
+    producer.close();
+    reader.close();
+    // will be released after exit this method.
+    ASSERT_EQ(1, consumerPtr.use_count());
+    ASSERT_EQ(1, readerPtr.use_count());
+    client.close();
+    // will be released after exit this method.
+    ASSERT_EQ(1, consumerPtr.use_count());
+    ASSERT_EQ(1, readerPtr.use_count());
+}
diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h
new file mode 100644
index 0000000..fd0387f
--- /dev/null
+++ b/pulsar-client-cpp/tests/ReaderTest.h
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "lib/ReaderImpl.h"
+#include <string>
+
+using std::string;
+
+namespace pulsar {
+class ReaderTest {
+   public:
+    static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
+    static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
+        return reader.impl_->getReaderImplWeakPtr();
+    }
+};
+}  // namespace pulsar