You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/03/23 07:27:35 UTC
[pulsar] branch branch-2.10 updated: [C++] Fix producer is never destructed until client is closed (#14797)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new d8e843b [C++] Fix producer is never destructed until client is closed (#14797)
d8e843b is described below
commit d8e843bf92d9a768a61622fa131ecd48335ad91d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Mar 23 14:35:38 2022 +0800
[C++] Fix producer is never destructed until client is closed (#14797)
Fixes #509
### Motivation
When a C++ producer is created successfully, it will start a send timer.
However the callback has captured the shared pointer of `ProducerImpl`
itself. It extends the lifetime of `ProducerImpl` so that even after the
`Producer` object destructs, the underlying `ProducerImpl` object won't
be destructed. It could only be destructed after `Client::close()` is
called.
### Modifications
- Pass a weak pointer of `ProducerImpl` to the send timer and add a
`asyncWaitSendTimeout` method for the combination of
`expires_from_now` and `async_wait` calls on the timer.
- Add `ClientTest.testReferenceCount` to verify the reference count will
become 0 after the producer or consumer destructs.
(cherry picked from commit f7cbc1eb83ffd27b784d90d5d2dea8660c590ad2)
---
pulsar-client-cpp/lib/ProducerImpl.cc | 25 ++++++++++++++++---------
pulsar-client-cpp/lib/ProducerImpl.h | 2 ++
pulsar-client-cpp/tests/ClientTest.cc | 27 +++++++++++++++++++++++++++
pulsar-client-cpp/tests/PulsarFriend.h | 8 ++++++++
4 files changed, 53 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 87f8552..bb95584 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -763,8 +763,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
std::shared_ptr<PendingCallbacks> pendingCallbacks;
if (pendingMessagesQueue_.empty()) {
// If there are no pending messages, reset the timeout to the configured value.
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
LOG_DEBUG(getName() << "Producer timeout triggered on empty pending message queue");
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
// If there is at least one message, calculate the diff between the message timeout and
// the current time.
@@ -774,17 +774,14 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
LOG_DEBUG(getName() << "Timer expired. Calling timeout callbacks.");
pendingCallbacks = getPendingCallbacksWhenFailed();
// Since the pending queue is cleared now, set timer to expire after configured value.
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
// The diff is greater than zero, set the timeout to the diff value
LOG_DEBUG(getName() << "Timer hasn't expired yet, setting new timeout " << diff);
- sendTimer_->expires_from_now(diff);
+ asyncWaitSendTimeout(diff);
}
}
- // Asynchronously wait for the timeout to trigger
- sendTimer_->async_wait(
- std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
lock.unlock();
if (pendingCallbacks) {
pendingCallbacks->complete(ResultTimeout);
@@ -946,11 +943,21 @@ void ProducerImpl::startSendTimeoutTimer() {
// timeout to happen.
if (!sendTimer_ && conf_.getSendTimeout() > 0) {
sendTimer_ = executor_->createDeadlineTimer();
- sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
- sendTimer_->async_wait(
- std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
+ asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
}
}
+void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
+ sendTimer_->expires_from_now(expiryTime);
+
+ ProducerImplBaseWeakPtr weakSelf = shared_from_this();
+ sendTimer_->async_wait([weakSelf](const boost::system::error_code& err) {
+ auto self = weakSelf.lock();
+ if (self) {
+ std::static_pointer_cast<ProducerImpl>(self)->handleSendTimeout(err);
+ }
+ });
+}
+
} // namespace pulsar
/* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 3dccf91..a8f35a6 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -170,6 +170,8 @@ class ProducerImpl : public HandlerBase,
DeadlineTimerPtr sendTimer_;
void handleSendTimeout(const boost::system::error_code& err);
+ using DurationType = typename boost::asio::deadline_timer::duration_type;
+ void asyncWaitSendTimeout(DurationType expiryTime);
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 6d55217..eb94ba6 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include "HttpHelper.h"
+#include "PulsarFriend.h"
#include <future>
#include <pulsar/Client.h>
@@ -189,3 +190,29 @@ TEST(ClientTest, testGetNumberOfReferences) {
client.close();
}
+
+TEST(ClientTest, testReferenceCount) {
+ Client client(lookupUrl);
+ const std::string topic = "client-test-reference-count-" + std::to_string(time(nullptr));
+
+ auto &producers = PulsarFriend::getProducers(client);
+ auto &consumers = PulsarFriend::getConsumers(client);
+
+ {
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(producers[0].use_count(), 1);
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(consumers[0].use_count(), 1);
+ }
+
+ ASSERT_EQ(producers.size(), 1);
+ ASSERT_EQ(producers[0].use_count(), 0);
+ ASSERT_EQ(consumers.size(), 1);
+ ASSERT_EQ(consumers[0].use_count(), 0);
+ client.close();
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index c7aa3a6..b04899c 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -95,6 +95,14 @@ class PulsarFriend {
static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
+ static ClientImpl::ProducersList& getProducers(const Client& client) {
+ return getClientImplPtr(client)->producers_;
+ }
+
+ static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+ return getClientImplPtr(client)->consumers_;
+ }
+
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
}