You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/12 03:24:24 UTC
[pulsar] branch branch-2.6 updated: cpp: fix reference leak when
reader create (#7793)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 255400b cpp: fix reference leak when reader create (#7793)
255400b is described below
commit 255400b16de130e63b7a0a08edf8e0dd6389700d
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Aug 12 06:50:05 2020 +0800
cpp: fix reference leak when reader create (#7793)
### Motivation
User reports a valgrind error for `client::createReader` method:
```
==23308== 284,826 (160 direct, 284,666 indirect) bytes in 1 blocks are definitely lost in loss record 113 of 113
==23308== at 0x4C2A593: operator new(unsigned long) (vg_replace_malloc.c:344)
==23308== by 0x5303B4A: allocate (new_allocator.h:104)
==23308== by 0x5303B4A: allocate (alloc_traits.h:351)
==23308== by 0x5303B4A: __shared_count<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:499)
==23308== by 0x5303B4A: __shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr_base.h:957)
==23308== by 0x5303B4A: shared_ptr<std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:316)
==23308== by 0x5303B4A: allocate_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader>, std::allocator<pulsar::InternalState<pulsar::Result, pulsar::Reader> > > (shared_ptr.h:598)
==23308== by 0x5303B4A: make_shared<pulsar::InternalState<pulsar::Result, pulsar::Reader> > (shared_ptr.h:614)
==23308== by 0x5303B4A: Promise (Future.h:91)
==23308== by 0x5303B4A: pulsar::Client::createReader(std::string const&, pulsar::MessageId const&, pulsar::ReaderConfiguration const&, pulsar::Reader&) (Client.cc:142)
==23308== by 0x401DDB: main (pulsarReader.cpp:92)
==23308==
```
It seems the `ReaderImpl` has been tracked twice when call WaitForCallbackValue. this PR is to fix the issue.
### Modifications
- fix WaitForCallbackValue which is changed in PR #3484.
- add test for the reference issue.
### Verifying this change
ut passed.
valgrind found no issue:
```
==14758== LEAK SUMMARY:
==14758== definitely lost: 0 bytes in 0 blocks
==14758== indirectly lost: 0 bytes in 0 blocks
==14758== possibly lost: 0 bytes in 0 blocks
==14758== still reachable: 12,621 bytes in 145 blocks
==14758== suppressed: 0 bytes in 0 blocks
==14758==
==14758== For lists of detected and suppressed errors, rerun with: -s
==14758== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
```
(cherry picked from commit 0e67fc598d3f22f14ca51276ebb75c57b7a159af)
---
pulsar-client-cpp/lib/ReaderImpl.cc | 6 ++++-
pulsar-client-cpp/lib/ReaderImpl.h | 3 +++
pulsar-client-cpp/lib/Utils.h | 4 +--
pulsar-client-cpp/tests/ReaderTest.cc | 46 +++++++++++++++++++++++++++++++++++
pulsar-client-cpp/tests/ReaderTest.h | 32 ++++++++++++++++++++++++
5 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index ad76493..4bd091a 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -63,7 +63,9 @@ void ReaderImpl::start(const MessageId& startMessageId) {
const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); }
void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) {
- readerCreatedCallback_(result, Reader(shared_from_this()));
+ auto self = shared_from_this();
+ readerCreatedCallback_(result, Reader(self));
+ readerImplWeakPtr_ = self;
}
ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; }
@@ -111,4 +113,6 @@ void ReaderImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
consumer_->seekAsync(timestamp, callback);
}
+ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index ca41510..4069247 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -52,6 +52,8 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
void seekAsync(const MessageId& msgId, ResultCallback callback);
void seekAsync(uint64_t timestamp, ResultCallback callback);
+ ReaderImplWeakPtr getReaderImplWeakPtr();
+
private:
void handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer);
@@ -65,6 +67,7 @@ class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
ConsumerImplPtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
+ ReaderImplWeakPtr readerImplWeakPtr_;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index cd2fb6d..fd50e97 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -38,9 +38,9 @@ struct WaitForCallback {
template <typename T>
struct WaitForCallbackValue {
- Promise<Result, T> m_promise;
+ Promise<Result, T>& m_promise;
- WaitForCallbackValue(Promise<Result, T> promise) : m_promise(promise) {}
+ WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
void operator()(Result result, const T& value) {
if (result == ResultOk) {
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index a91af57..7793311 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -17,6 +17,8 @@
* under the License.
*/
#include <pulsar/Client.h>
+#include <pulsar/Reader.h>
+#include "ReaderTest.h"
#include <gtest/gtest.h>
@@ -416,3 +418,47 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
reader.close();
client.close();
}
+
+TEST(ReaderTest, testReferenceLeak) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/testReferenceLeak";
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ for (int i = 0; i < 10; i++) {
+ std::string content = "my-message-" + std::to_string(i);
+ Message msg = MessageBuilder().setContent(content).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ ReaderConfiguration readerConf;
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+ ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader);
+ ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader);
+
+ LOG_INFO("1 consumer use count " << consumerPtr.use_count());
+ LOG_INFO("1 reader use count " << readerPtr.use_count());
+
+ for (int i = 0; i < 10; i++) {
+ Message msg;
+ ASSERT_EQ(ResultOk, reader.readNext(msg));
+
+ std::string content = msg.getDataAsString();
+ std::string expected = "my-message-" + std::to_string(i);
+ ASSERT_EQ(expected, content);
+ }
+
+ producer.close();
+ reader.close();
+ // will be released after exit this method.
+ ASSERT_EQ(1, consumerPtr.use_count());
+ ASSERT_EQ(1, readerPtr.use_count());
+ client.close();
+ // will be released after exit this method.
+ ASSERT_EQ(1, consumerPtr.use_count());
+ ASSERT_EQ(1, readerPtr.use_count());
+}
diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h
new file mode 100644
index 0000000..fd0387f
--- /dev/null
+++ b/pulsar-client-cpp/tests/ReaderTest.h
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "lib/ReaderImpl.h"
+#include <string>
+
+using std::string;
+
+namespace pulsar {
+class ReaderTest {
+ public:
+ static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); }
+ static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) {
+ return reader.impl_->getReaderImplWeakPtr();
+ }
+};
+} // namespace pulsar