You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/10/21 04:14:57 UTC
[pulsar-client-cpp] 04/06: [fix] Fix memory leak caused by incorrect close and destruction (#54)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit 361a6214894c58c0ab463c41ffb92a2c7458c897
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 20 22:45:20 2022 +0800
[fix] Fix memory leak caused by incorrect close and destruction (#54)
Fixes https://github.com/apache/pulsar-client-cpp/issues/55
### Motivation
1. When a producer or consumer is closed, the reference is still stored
in `ClientImpl`. If a client kept creating producers or consumers,
the memory usage would not reduce.
2. When the `HandlerBase::connection_` field is modified, the
`removeProducer` or `removeConsumer` method is not called. Then these
producers and consumers will be cached in the connection until the
connection is closed.
3. The `PartitionedProducerImpl` and `MultiTopicsConsumerImpl` have
cyclic references, when a `Producer` or `Consumer` instance goes out
of the scope, the destructors are not called. When I used GDB to
debug them, I found the reference counts were both greater than 1.
### Modifications
Let's use "handlers" to represent "producers and consumers".
1. In `ClientImpl`, use `SynchronizedHashMap` to store references of
handlers, as well as the `cleanupXXX` methods to remove a handler.
2. Add `HandlerBase::beforeConnectionChange` method, which is called
before `connection_` is modified. Disallow the access to
`connection_` from derived classes.
3. Avoid `shared_from_this()` being passed into callbacks in ASIO
executors for `PartitionedProducerImpl` and
`MultiTopicsConsumerImpl`.
This PR also unifies the `shutdown` implementations for handlers and
call `shutdown` in the destructors.
1. Cancel the timers
2. Unregister itself from `ClientImpl` and `ClientConnection`
3. Set the create future with `ResultAlreadyClosed`
4. Set the state to `Closed`
It's called when:
- the destructor is called
- `closeAsync` is completed
- `unsubscribeAsync` is completed with ResultOk
### Verifications
`ShutdownTest` is added to verify the following cases:
- a single topic
- a partitioned topic (multiple topics)
- a partitioned topic with regex subscription
`testClose` verifies `shutdown` when `closeAsync` and `unsubscribeAsync`
are called. `testDestructor` verifies `shutdown` when handlers go out of
the scope and the destructors are called.
---
.github/workflows/ci-pr-validation.yaml | 2 +-
lib/ClientConnection.h | 2 +-
lib/ClientImpl.cc | 89 +++++++------
lib/ClientImpl.h | 13 +-
lib/ConnectionPool.h | 4 +-
lib/ConsumerImpl.cc | 125 +++++++++---------
lib/ConsumerImpl.h | 5 +-
lib/HandlerBase.cc | 24 ++--
lib/HandlerBase.h | 20 ++-
lib/MultiTopicsConsumerImpl.cc | 217 ++++++++++++++++++++++----------
lib/MultiTopicsConsumerImpl.h | 11 +-
lib/PartitionedProducerImpl.cc | 83 ++++++++----
lib/PartitionedProducerImpl.h | 5 +-
lib/PatternMultiTopicsConsumerImpl.cc | 19 +--
lib/PatternMultiTopicsConsumerImpl.h | 1 +
lib/PeriodicTask.cc | 5 +-
lib/PeriodicTask.h | 2 +-
lib/ProducerImpl.cc | 90 ++++++-------
lib/ProducerImpl.h | 5 +-
lib/SynchronizedHashMap.h | 18 +--
tests/ClientTest.cc | 28 ++---
tests/PulsarFriend.h | 35 +++++-
tests/ShutdownTest.cc | 121 ++++++++++++++++++
tests/WaitUtils.h | 43 -------
24 files changed, 610 insertions(+), 357 deletions(-)
diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml
index 58f1485..1bd128c 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -68,7 +68,7 @@ jobs:
run: ./pulsar-test-service-start.sh
- name: Run unit tests
- run: ./run-unit-tests.sh
+ run: RETRY_FAILED=3 ./run-unit-tests.sh
- name: Stop Pulsar service
run: ./pulsar-test-service-stop.sh
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 418a583..8a48408 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
- std::mutex mutex_;
+ mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;
// Pending buffers to write on the socket
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 29e92f3..025727a 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
if (result == ResultOk) {
- Lock lock(mutex_);
- producers_.push_back(producer);
- lock.unlock();
+ auto pair = producers_.emplace(producer.get(), producer);
+ if (!pair.second) {
+ auto existingProducer = pair.first->second.lock();
+ LOG_ERROR("Unexpected existing producer at the same address: "
+ << pair.first->first << ", producer: "
+ << (existingProducer ? existingProducer->getProducerName() : "(null)"));
+ callback(ResultUnknownError, {});
+ return;
+ }
callback(result, Producer(producer));
} else {
callback(result, {});
@@ -241,9 +247,18 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
- Lock lock(mutex_);
- consumers_.push_back(weakConsumerPtr);
- lock.unlock();
+ auto consumer = weakConsumerPtr.lock();
+ if (consumer) {
+ auto pair = consumers_.emplace(consumer.get(), consumer);
+ if (!pair.second) {
+ auto existingConsumer = pair.first->second.lock();
+ LOG_ERROR("Unexpected existing consumer at the same address: "
+ << pair.first->first
+ << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+ }
+ } else {
+ LOG_ERROR("Unexpected case: the consumer is somehow expired");
+ }
});
}
@@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
if (result == ResultOk) {
- Lock lock(mutex_);
- consumers_.push_back(consumer);
- lock.unlock();
+ auto pair = consumers_.emplace(consumer.get(), consumer);
+ if (!pair.second) {
+ auto existingConsumer = pair.first->second.lock();
+ LOG_ERROR("Unexpected existing consumer at the same address: "
+ << pair.first->first
+ << ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
+ callback(ResultUnknownError, {});
+ return;
+ }
callback(result, Consumer(consumer));
} else {
callback(result, {});
@@ -477,27 +498,26 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
}
void ClientImpl::closeAsync(CloseCallback callback) {
- Lock lock(mutex_);
- ProducersList producers(producers_);
- ConsumersList consumers(consumers_);
-
- if (state_ != Open && callback) {
- lock.unlock();
- callback(ResultAlreadyClosed);
+ if (state_ != Open) {
+ if (callback) {
+ callback(ResultAlreadyClosed);
+ }
return;
}
// Set the state to Closing so that no producers could get added
state_ = Closing;
- lock.unlock();
memoryLimitController_.close();
+ auto producers = producers_.move();
+ auto consumers = consumers_.move();
+
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");
- for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
- ProducerImplBasePtr producer = it->lock();
+ for (auto&& kv : producers) {
+ ProducerImplBasePtr producer = kv.second.lock();
if (producer && !producer->isClosed()) {
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -507,8 +527,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}
}
- for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
- ConsumerImplBasePtr consumer = it->lock();
+ for (auto&& kv : consumers) {
+ ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer && !consumer->isClosed()) {
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
@@ -562,23 +582,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
}
void ClientImpl::shutdown() {
- Lock lock(mutex_);
- ProducersList producers;
- ConsumersList consumers;
+ auto producers = producers_.move();
+ auto consumers = consumers_.move();
- producers.swap(producers_);
- consumers.swap(consumers_);
- lock.unlock();
-
- for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
- ProducerImplBasePtr producer = it->lock();
+ for (auto&& kv : producers) {
+ ProducerImplBasePtr producer = kv.second.lock();
if (producer) {
producer->shutdown();
}
}
- for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
- ConsumerImplBasePtr consumer = it->lock();
+ for (auto&& kv : consumers) {
+ ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer) {
consumer->shutdown();
}
@@ -631,26 +646,24 @@ uint64_t ClientImpl::newRequestId() {
}
uint64_t ClientImpl::getNumberOfProducers() {
- Lock lock(mutex_);
uint64_t numberOfAliveProducers = 0;
- for (const auto& producer : producers_) {
+ producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
const auto& producerImpl = producer.lock();
if (producerImpl) {
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
}
- }
+ });
return numberOfAliveProducers;
}
uint64_t ClientImpl::getNumberOfConsumers() {
- Lock lock(mutex_);
uint64_t numberOfAliveConsumers = 0;
- for (const auto& consumer : consumers_) {
+ consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
const auto consumerImpl = consumer.lock();
if (consumerImpl) {
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
}
- }
+ });
return numberOfAliveConsumers;
}
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index e8e7708..50ddeff 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -31,6 +31,7 @@
#include <atomic>
#include <vector>
#include "ServiceNameResolver.h"
+#include "SynchronizedHashMap.h"
namespace pulsar {
@@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
LookupServicePtr getLookup();
+
+ void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }
+
+ void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }
+
friend class PulsarFriend;
private:
@@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
uint64_t consumerIdGenerator_;
uint64_t requestIdGenerator_;
- typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
- ProducersList producers_;
-
- typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
- ConsumersList consumers_;
+ SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
+ SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;
std::atomic<Result> closingError;
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 21d439e..996df54 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -74,10 +74,10 @@ class PULSAR_PUBLIC ConnectionPool {
typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
PoolMap pool_;
bool poolConnections_;
- std::mutex mutex_;
+ mutable std::mutex mutex_;
std::atomic_bool closed_{false};
- friend class ConnectionPoolTest;
+ friend class PulsarFriend;
};
} // namespace pulsar
#endif //_PULSAR_CONNECTION_POOL_HEADER_
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 54e346f..7be5a6a 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -104,7 +104,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
ConsumerImpl::~ConsumerImpl() {
LOG_DEBUG(getName() << "~ConsumerImpl");
- incomingMessages_.clear();
if (state_ == Ready) {
// this could happen at least in this condition:
// consumer seek, caused reconnection, if consumer close happened before connection ready,
@@ -121,6 +120,7 @@ ConsumerImpl::~ConsumerImpl() {
LOG_INFO(getName() << "Closed consumer for race condition: " << consumerId_);
}
}
+ shutdown();
}
void ConsumerImpl::setPartitionIndex(int partitionIndex) { partitionIndex_ = partitionIndex; }
@@ -156,6 +156,8 @@ void ConsumerImpl::start() {
ackGroupingTrackerPtr_->start();
}
+void ConsumerImpl::beforeConnectionChange(ClientConnection& cnx) { cnx.removeConsumer(consumerId_); }
+
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
@@ -220,7 +222,7 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock lock(mutex_);
- connection_ = cnx;
+ setCnx(cnx);
incomingMessages_.clear();
state_ = Ready;
backoff_.reset();
@@ -267,13 +269,24 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
}
}
-void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
LOG_INFO(getName() << "Unsubscribing");
+ auto callback = [this, originalCallback](Result result) {
+ if (result == ResultOk) {
+ shutdown();
+ LOG_INFO(getName() << "Unsubscribed successfully");
+ } else {
+ state_ = Ready;
+ LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
+
if (state_ != Ready) {
callback(ResultAlreadyClosed);
- LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
- "then call unsubscribe");
return;
}
@@ -286,9 +299,9 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
lock.unlock();
int requestId = client->newRequestId();
SharedBuffer cmd = Commands::newUnsubscribe(consumerId_, requestId);
+ auto self = get_shared_this_ptr();
cnx->sendRequestWithId(cmd, requestId)
- .addListener(std::bind(&ConsumerImpl::handleUnsubscribe, get_shared_this_ptr(),
- std::placeholders::_1, callback));
+ .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
} else {
Result result = ResultNotConnected;
lock.unlock();
@@ -297,16 +310,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
}
}
-void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
- if (result == ResultOk) {
- state_ = Closed;
- LOG_INFO(getName() << "Unsubscribed successfully");
- } else {
- LOG_WARN(getName() << "Failed to unsubscribe: " << strResult(result));
- }
- callback(result);
-}
-
Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& payload,
const proto::MessageMetadata& metadata,
const MessageId& messageId,
@@ -990,20 +993,25 @@ void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
void ConsumerImpl::disconnectConsumer() {
LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
- Lock lock(mutex_);
- connection_.reset();
- lock.unlock();
+ resetCnx();
scheduleReconnection(get_shared_this_ptr());
}
-void ConsumerImpl::closeAsync(ResultCallback callback) {
- // Keep a reference to ensure object is kept alive
- ConsumerImplPtr ptr = get_shared_this_ptr();
+void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
+ auto callback = [this, originalCallback](Result result) {
+ shutdown();
+ if (result == ResultOk) {
+ LOG_INFO(getName() << "Closed consumer " << consumerId_);
+ } else {
+ LOG_WARN(getName() << "Failed to close consumer: " << result);
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
if (state_ != Ready) {
- if (callback) {
- callback(ResultAlreadyClosed);
- }
+ callback(ResultAlreadyClosed);
return;
}
@@ -1018,66 +1026,40 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
- state_ = Closed;
// If connection is gone, also the consumer is closed on the broker side
- if (callback) {
- callback(ResultOk);
- }
+ callback(ResultOk);
return;
}
ClientImplPtr client = client_.lock();
if (!client) {
- state_ = Closed;
// Client was already destroyed
- if (callback) {
- callback(ResultOk);
- }
+ callback(ResultOk);
return;
}
- int requestId = client->newRequestId();
- Future<Result, ResponseData> future =
- cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
- if (callback) {
- // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
- future.addListener(std::bind(&ConsumerImpl::handleClose, get_shared_this_ptr(), std::placeholders::_1,
- callback, ptr));
- }
+ cancelTimers();
- // fail pendingReceive callback
- failPendingReceiveCallback();
- failPendingBatchReceiveCallback();
-
- // cancel timer
- batchReceiveTimer_->cancel();
-}
-
-void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
- if (result == ResultOk) {
- state_ = Closed;
-
- ClientConnectionPtr cnx = getCnx().lock();
- if (cnx) {
- cnx->removeConsumer(consumerId_);
- }
-
- LOG_INFO(getName() << "Closed consumer " << consumerId_);
- } else {
- LOG_ERROR(getName() << "Failed to close consumer: " << result);
- }
-
- if (callback) {
- callback(result);
- }
+ int requestId = client->newRequestId();
+ auto self = get_shared_this_ptr();
+ cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId)
+ .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
}
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
void ConsumerImpl::shutdown() {
- state_ = Closed;
-
+ incomingMessages_.clear();
+ resetCnx();
+ auto client = client_.lock();
+ if (client) {
+ client->cleanupConsumer(this);
+ }
+ cancelTimers();
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ failPendingReceiveCallback();
+ failPendingBatchReceiveCallback();
+ state_ = Closed;
}
bool ConsumerImpl::isClosed() { return state_ == Closed; }
@@ -1437,4 +1419,9 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
return std::dynamic_pointer_cast<ConsumerImpl>(shared_from_this());
}
+void ConsumerImpl::cancelTimers() noexcept {
+ boost::system::error_code ec;
+ batchReceiveTimer_->cancel(ec);
+}
+
} /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 09d2c5c..3aa632a 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -84,7 +84,6 @@ class ConsumerImpl : public ConsumerImplBase {
void activeConsumerChanged(bool isActive);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
- void handleUnsubscribe(Result result, ResultCallback callback);
/**
* Send individual ACK request of given message ID to broker.
@@ -140,6 +139,7 @@ class ConsumerImpl : public ConsumerImplBase {
virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
virtual void getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback);
+ void beforeConnectionChange(ClientConnection& cnx) override;
protected:
// overrided methods from HandlerBase
@@ -156,7 +156,8 @@ class ConsumerImpl : public ConsumerImplBase {
void internalConsumerChangeListener(bool isActive);
- void handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer);
+ void cancelTimers() noexcept;
+
ConsumerStatsBasePtr consumerStatsBasePtr_;
private:
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 506207e..1f4ce6e 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -30,7 +30,6 @@ namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: client_(client),
topic_(topic),
- connection_(),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
@@ -50,14 +49,25 @@ void HandlerBase::start() {
}
}
+ClientConnectionWeakPtr HandlerBase::getCnx() const {
+ Lock lock(connectionMutex_);
+ return connection_;
+}
+
+void HandlerBase::setCnx(const ClientConnectionPtr& cnx) {
+ Lock lock(connectionMutex_);
+ auto previousCnx = connection_.lock();
+ if (previousCnx) {
+ beforeConnectionChange(*previousCnx);
+ }
+ connection_ = cnx;
+}
+
void HandlerBase::grabCnx() {
- Lock lock(mutex_);
- if (connection_.lock()) {
- lock.unlock();
+ if (getCnx().lock()) {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
return;
}
- lock.unlock();
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
@@ -96,14 +106,14 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
State state = handler->state_;
- ClientConnectionPtr currentConnection = handler->connection_.lock();
+ ClientConnectionPtr currentConnection = handler->getCnx().lock();
if (currentConnection && connection.lock().get() != currentConnection.get()) {
LOG_WARN(handler->getName()
<< "Ignoring connection closed since we are already attached to a newer connection");
return;
}
- handler->connection_.reset();
+ handler->resetCnx();
if (result == ResultRetryable) {
scheduleReconnection(handler);
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 6fc3603..6616ec4 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -44,11 +44,9 @@ class HandlerBase {
void start();
- /*
- * get method for derived class to access weak ptr to connection so that they
- * have to check if they can get a shared_ptr out of it or not
- */
- ClientConnectionWeakPtr getCnx() const { return connection_; }
+ ClientConnectionWeakPtr getCnx() const;
+ void setCnx(const ClientConnectionPtr& cnx);
+ void resetCnx() { setCnx(nullptr); }
protected:
/*
@@ -65,6 +63,14 @@ class HandlerBase {
* Should we retry in error that are transient
*/
bool isRetriableError(Result result);
+
+ /**
+ * Do some cleanup work before changing `connection_` to `cnx`.
+ *
+ * @param cnx the current connection
+ */
+ virtual void beforeConnectionChange(ClientConnection& cnx) = 0;
+
/*
* connectionOpened will be implemented by derived class to receive notification
*/
@@ -86,7 +92,6 @@ class HandlerBase {
protected:
ClientImplWeakPtr client_;
const std::string topic_;
- ClientConnectionWeakPtr connection_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
@@ -112,6 +117,9 @@ class HandlerBase {
private:
DeadlineTimerPtr timer_;
+
+ mutable std::mutex connectionMutex_;
+ ClientConnectionWeakPtr connection_;
friend class ClientConnection;
friend class PulsarFriend;
};
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 573c33d..c54f8e8 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -19,6 +19,7 @@
#include "MultiTopicsConsumerImpl.h"
#include "MultiResultCallback.h"
#include "MessagesImpl.h"
+#include <stdexcept>
DECLARE_LOG_OBJECT()
@@ -55,11 +56,11 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
- auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+ auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
if (partitionsUpdateInterval > 0) {
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
- lookupServicePtr_ = client_->getLookup();
+ lookupServicePtr_ = client->getLookup();
}
state_ = Pending;
@@ -83,10 +84,16 @@ void MultiTopicsConsumerImpl::start() {
int topicsNumber = topics_.size();
std::shared_ptr<std::atomic<int>> topicsNeedCreate = std::make_shared<std::atomic<int>>(topicsNumber);
// subscribe for each passed in topic
+ auto weakSelf = weak_from_this();
for (std::vector<std::string>::const_iterator itr = topics_.begin(); itr != topics_.end(); itr++) {
- subscribeOneTopicAsync(*itr).addListener(std::bind(&MultiTopicsConsumerImpl::handleOneTopicSubscribed,
- get_shared_this_ptr(), std::placeholders::_1,
- std::placeholders::_2, *itr, topicsNeedCreate));
+ auto topic = *itr;
+ subscribeOneTopicAsync(topic).addListener(
+ [this, weakSelf, topic, topicsNeedCreate](Result result, const Consumer& consumer) {
+ auto self = weakSelf.lock();
+ if (self) {
+ handleOneTopicSubscribed(result, consumer, topic, topicsNeedCreate);
+ }
+ });
}
}
@@ -111,9 +118,9 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
// unsubscribed all of the successfully subscribed partitioned consumers
- // It's safe to capture only this here, because the callback can be called only when this is valid
- closeAsync(
- [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
+ // `shutdown()`, which set multiTopicsConsumerCreatedPromise_ with `failedResult`, will be called
+ // when `closeAsync` completes.
+ closeAsync(nullptr);
}
}
}
@@ -164,10 +171,20 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
ConsumerSubResultPromisePtr topicSubResultPromise) {
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
- ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
+ auto client = client_.lock();
+ if (!client) {
+ topicSubResultPromise->setFailed(ResultAlreadyClosed);
+ return;
+ }
+ ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
- config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
- std::placeholders::_1, std::placeholders::_2));
+ auto weakSelf = weak_from_this();
+ config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+ auto self = weakSelf.lock();
+ if (self) {
+ messageReceived(consumer, msg);
+ }
+ });
int partitions = numPartitions == 0 ? 1 : numPartitions;
@@ -186,7 +203,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
- consumer = std::make_shared<ConsumerImpl>(client_, topicName->toString(), subscriptionName_, config,
+ consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor, true,
NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -199,7 +216,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
} else {
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
- consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+ consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor,
true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
@@ -244,12 +261,24 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
}
}
-void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
+ auto callback = [this, originalCallback](Result result) {
+ if (result == ResultOk) {
+ shutdown();
+ LOG_INFO(getName() << "Unsubscribed successfully");
+ } else {
+ state_ = Ready;
+ LOG_WARN(getName() << "Failed to unsubscribe: " << result);
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
+
const auto state = state_.load();
if (state == Closing || state == Closed) {
- LOG_INFO(consumerStr_ << " already closed");
callback(ResultAlreadyClosed);
return;
}
@@ -284,12 +313,9 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
if (consumerUnsubed->load() == numberTopicPartitions_->load()) {
LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_);
- consumers_.clear();
- topicsPartitions_.clear();
- unAckedMessageTrackerPtr_->clear();
-
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
- state_ = Closed;
+ // The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if
+ // unsubscribe succeeds.
callback(result1);
return;
}
@@ -376,20 +402,27 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
}
}
-void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
+ auto callback = [this, originalCallback](Result result) {
+ shutdown();
+ if (result != ResultOk) {
+ LOG_WARN(getName() << "Failed to close consumer: " << result);
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
const auto state = state_.load();
if (state == Closing || state == Closed) {
- LOG_ERROR("TopicsConsumer already closed "
- << " topic" << topic_ << " consumer - " << consumerStr_);
- if (callback) {
- callback(ResultAlreadyClosed);
- }
+ callback(ResultAlreadyClosed);
return;
}
state_ = Closing;
- std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+ cancelTimers();
+
+ auto weakSelf = weak_from_this();
int numConsumers = 0;
consumers_.clear(
[this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) {
@@ -418,27 +451,14 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
}
// closed all consumers
if (numConsumersLeft == 0) {
- incomingMessages_.clear();
- topicsPartitions_.clear();
- unAckedMessageTrackerPtr_->clear();
-
- if (state_ != Failed) {
- state_ = Closed;
- }
-
- if (callback) {
- callback(result);
- }
+ callback(result);
}
});
});
if (numConsumers == 0) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " << subscriptionName_);
- state_ = Closed;
- if (callback) {
- callback(ResultAlreadyClosed);
- }
+ callback(ResultAlreadyClosed);
return;
}
@@ -461,8 +481,13 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
- listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
- get_shared_this_ptr(), ResultOk, msg, callback));
+ auto weakSelf = weak_from_this();
+ listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+ auto self = weakSelf.lock();
+ if (self) {
+ notifyPendingReceivedCallback(ResultOk, msg, callback);
+ }
+ });
return;
}
@@ -564,13 +589,18 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
- listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
- get_shared_this_ptr(), ResultAlreadyClosed, msg, callback));
+ auto weakSelf = weak_from_this();
+ listenerExecutor_->postWork([this, weakSelf, msg, callback]() {
+ auto self = weakSelf.lock();
+ if (self) {
+ notifyPendingReceivedCallback(ResultAlreadyClosed, msg, callback);
+ }
+ });
}
lock.unlock();
}
-void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
+void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, const Message& msg,
const ReceiveCallback& callback) {
if (result == ResultOk) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -609,7 +639,7 @@ void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
}
}
-MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
+MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() { shutdown(); }
Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture() {
return multiTopicsConsumerCreatedPromise_.getFuture();
@@ -620,7 +650,24 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
-void MultiTopicsConsumerImpl::shutdown() {}
+void MultiTopicsConsumerImpl::shutdown() {
+ cancelTimers();
+ incomingMessages_.clear();
+ topicsPartitions_.clear();
+ unAckedMessageTrackerPtr_->clear();
+ auto client = client_.lock();
+ if (client) {
+ client->cleanupConsumer(this);
+ }
+ consumers_.clear();
+ topicsPartitions_.clear();
+ if (failedResult != ResultOk) {
+ multiTopicsConsumerCreatedPromise_.setFailed(failedResult);
+ } else {
+ multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ }
+ state_ = Closed;
+}
bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
@@ -684,13 +731,16 @@ void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
lock.unlock();
- auto self = get_shared_this_ptr();
size_t i = 0;
- consumers_.forEachValue([&self, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
+ consumers_.forEachValue([this, &latchPtr, &statsPtr, &i, callback](const ConsumerImplPtr& consumer) {
size_t index = i++;
+ auto weakSelf = weak_from_this();
consumer->getBrokerConsumerStatsAsync(
- [self, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
- self->handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+ [this, weakSelf, latchPtr, statsPtr, index, callback](Result result, BrokerConsumerStats stats) {
+ auto self = weakSelf.lock();
+ if (self) {
+ handleGetConsumerStats(result, stats, latchPtr, statsPtr, index, callback);
+ }
});
});
}
@@ -772,7 +822,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
}
void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
- std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+ auto weakSelf = weak_from_this();
partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
// If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
// cannot continue at this time, and the request needs to be ignored.
@@ -790,9 +840,15 @@ void MultiTopicsConsumerImpl::topicPartitionUpdate() {
for (const auto& item : topicsPartitions) {
auto topicName = TopicName::get(item.first);
auto currentNumPartitions = item.second;
+ auto weakSelf = weak_from_this();
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
- std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, get_shared_this_ptr(), topicName,
- std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
+ [this, weakSelf, topicName, currentNumPartitions](Result result,
+ const LookupDataResultPtr& lookupDataResult) {
+ auto self = weakSelf.lock();
+ if (self) {
+ this->handleGetPartitions(topicName, result, lookupDataResult, currentNumPartitions);
+ }
+ });
}
}
void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
@@ -831,9 +887,19 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
ConsumerSubResultPromisePtr topicSubResultPromise,
std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
ConsumerConfiguration config = conf_.clone();
- ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
- config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, get_shared_this_ptr(),
- std::placeholders::_1, std::placeholders::_2));
+ auto client = client_.lock();
+ if (!client) {
+ topicSubResultPromise->setFailed(ResultAlreadyClosed);
+ return;
+ }
+ ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
+ auto weakSelf = weak_from_this();
+ config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
+ auto self = weakSelf.lock();
+ if (self) {
+ messageReceived(consumer, msg);
+ }
+ });
// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
@@ -842,12 +908,18 @@ void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);
- auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+ auto consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor, true,
Partitioned);
consumer->getConsumerCreatedFuture().addListener(
- std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
- std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+ [this, weakSelf, partitionsNeedCreate, topicSubResultPromise](
+ Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr) {
+ auto self = weakSelf.lock();
+ if (self) {
+ handleSingleConsumerCreated(result, consumerImplBaseWeakPtr, partitionsNeedCreate,
+ topicSubResultPromise);
+ }
+ });
consumer->setPartitionIndex(partitionIndex);
consumer->start();
consumers_.emplace(topicPartitionName, consumer);
@@ -873,9 +945,13 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
messageProcessed(peekMsg);
messages->add(peekMsg);
}
- auto self = get_shared_this_ptr();
- listenerExecutor_->postWork(
- [callback, messages, self]() { callback(ResultOk, messages->getMessageList()); });
+ auto weakSelf = weak_from_this();
+ listenerExecutor_->postWork([weakSelf, callback, messages]() {
+ auto self = weakSelf.lock();
+ if (self) {
+ callback(ResultOk, messages->getMessageList());
+ }
+ });
}
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
@@ -886,3 +962,14 @@ void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
return std::dynamic_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
}
+
+void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
+ throw std::runtime_error("The connection_ field should not be modified for a MultiTopicsConsumerImpl");
+}
+
+void MultiTopicsConsumerImpl::cancelTimers() noexcept {
+ if (partitionsUpdateTimer_) {
+ boost::system::error_code ec;
+ partitionsUpdateTimer_->cancel(ec);
+ }
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 044f417..7c83da9 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -87,7 +87,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
Future<Result, Consumer> subscribeOneTopicAsync(const std::string& topic);
protected:
- const ClientImplPtr client_;
+ const ClientImplWeakPtr client_;
const std::string subscriptionName_;
std::string consumerStr_;
const ConsumerConfiguration conf_;
@@ -118,7 +118,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
- void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);
+ void notifyPendingReceivedCallback(Result result, const Message& message,
+ const ReceiveCallback& callback);
void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
@@ -142,10 +143,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
// impl consumer base virtual method
bool hasEnoughMessagesForBatchReceive() const override;
void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override;
+ void beforeConnectionChange(ClientConnection& cnx) override;
private:
std::shared_ptr<MultiTopicsConsumerImpl> get_shared_this_ptr();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
+ void cancelTimers() noexcept;
+
+ std::weak_ptr<MultiTopicsConsumerImpl> weak_from_this() noexcept {
+ return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this());
+ }
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index 469ecc9..3d383ff 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -46,12 +46,12 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
(int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
- auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+ auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval());
if (partitionsUpdateInterval > 0) {
- listenerExecutor_ = client_->getListenerExecutorProvider()->get();
+ listenerExecutor_ = client->getListenerExecutorProvider()->get();
partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
- lookupServicePtr_ = client_->getLookup();
+ lookupServicePtr_ = client->getLookup();
}
}
@@ -71,7 +71,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
}
}
-PartitionedProducerImpl::~PartitionedProducerImpl() {}
+PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); }
// override
const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
@@ -86,7 +86,11 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
- auto producer = std::make_shared<ProducerImpl>(client_, *topicName_, conf_, partition);
+ auto client = client_.lock();
+ auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
+ if (!client) {
+ return producer;
+ }
if (lazy) {
createLazyPartitionProducer(partition);
@@ -211,7 +215,15 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
}
// override
-void PartitionedProducerImpl::shutdown() { state_ = Closed; }
+void PartitionedProducerImpl::shutdown() {
+ cancelTimers();
+ auto client = client_.lock();
+ if (client) {
+ client->cleanupProducer(this);
+ }
+ partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ state_ = Closed;
+}
const std::string& PartitionedProducerImpl::getProducerName() const {
Lock producersLock(producersMutex_);
@@ -239,11 +251,25 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
* if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to
* create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
*/
-void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
- if (state_ == Closing || state_ == Closed) {
+void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) {
+ auto closeCallback = [this, originalCallback](Result result) {
+ if (result == ResultOk) {
+ shutdown();
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
+ if (state_ == Closed) {
+ closeCallback(ResultAlreadyClosed);
+ return;
+ }
+ State expectedState = Ready;
+ if (!state_.compare_exchange_strong(expectedState, Closing)) {
return;
}
- state_ = Closing;
+
+ cancelTimers();
unsigned int producerAlreadyClosed = 0;
@@ -271,12 +297,12 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
* c. If closeAsync called due to failure in creating just one sub producer then state is set by
* handleSinglePartitionProducerCreated
*/
- if (producerAlreadyClosed == numProducers && closeCallback) {
- state_ = Closed;
+ if (producerAlreadyClosed == numProducers) {
closeCallback(ResultOk);
}
}
+// `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()`
void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
const unsigned int partitionIndex,
CloseCallback callback) {
@@ -285,11 +311,9 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
return;
}
if (result != ResultOk) {
- state_ = Failed;
LOG_ERROR("Closing the producer failed for partition - " << partitionIndex);
- if (callback) {
- callback(result);
- }
+ callback(result);
+ state_ = Failed;
return;
}
assert(partitionIndex < getNumPartitionsWithLock());
@@ -298,16 +322,13 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
}
// closed all successfully
if (!numProducersCreated_) {
- state_ = Closed;
// set the producerCreatedPromise to failure, if client called
// closeAsync and it's not failure to create producer, the promise
// is set second time here, first time it was successful. So check
// if there's any adverse effect of setting it again. It should not
// be but must check. MUSTCHECK changeme
partitionedProducerCreatedPromise_.setFailed(ResultUnknownError);
- if (callback) {
- callback(result);
- }
+ callback(result);
return;
}
}
@@ -371,15 +392,26 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
}
void PartitionedProducerImpl::runPartitionUpdateTask() {
+ auto weakSelf = weak_from_this();
partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
- partitionsUpdateTimer_->async_wait(
- std::bind(&PartitionedProducerImpl::getPartitionMetadata, shared_from_this()));
+ partitionsUpdateTimer_->async_wait([weakSelf](const boost::system::error_code& ec) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->getPartitionMetadata();
+ }
+ });
}
void PartitionedProducerImpl::getPartitionMetadata() {
using namespace std::placeholders;
+ auto weakSelf = weak_from_this();
lookupServicePtr_->getPartitionMetadataAsync(topicName_)
- .addListener(std::bind(&PartitionedProducerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+ .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->handleGetPartitions(result, lookupDataResult);
+ }
+ });
}
void PartitionedProducerImpl::handleGetPartitions(Result result,
@@ -446,4 +478,11 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
return numberOfConnectedProducer;
}
+void PartitionedProducerImpl::cancelTimers() noexcept {
+ if (partitionsUpdateTimer_) {
+ boost::system::error_code ec;
+ partitionsUpdateTimer_->cancel(ec);
+ }
+}
+
} // namespace pulsar
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index 0a8c10e..cc7a4e0 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -73,10 +73,12 @@ class PartitionedProducerImpl : public ProducerImplBase,
void notifyResult(CloseCallback closeCallback);
+ std::weak_ptr<PartitionedProducerImpl> weak_from_this() noexcept { return shared_from_this(); }
+
friend class PulsarFriend;
private:
- const ClientImplPtr client_;
+ ClientImplWeakPtr client_;
const TopicNamePtr topicName_;
const std::string topic_;
@@ -119,6 +121,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
+ void cancelTimers() noexcept;
};
} // namespace pulsar
diff --git a/lib/PatternMultiTopicsConsumerImpl.cc b/lib/PatternMultiTopicsConsumerImpl.cc
index 79ed196..8014078 100644
--- a/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/lib/PatternMultiTopicsConsumerImpl.cc
@@ -32,7 +32,7 @@ PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr cli
lookupServicePtr_),
patternString_(pattern),
pattern_(PULSAR_REGEX_NAMESPACE::regex(pattern)),
- autoDiscoveryTimer_(),
+ autoDiscoveryTimer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()),
autoDiscoveryRunning_(false) {
namespaceName_ = TopicName::get(pattern)->getNamespaceName();
}
@@ -215,9 +215,7 @@ void PatternMultiTopicsConsumerImpl::start() {
LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
- // Init autoDiscoveryTimer task only once, wait for the timeout to happen
- if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
- autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+ if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
autoDiscoveryTimer_->async_wait(
std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
@@ -225,13 +223,16 @@ void PatternMultiTopicsConsumerImpl::start() {
}
void PatternMultiTopicsConsumerImpl::shutdown() {
- Lock lock(mutex_);
- state_ = Closed;
- autoDiscoveryTimer_->cancel();
- multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ cancelTimers();
+ MultiTopicsConsumerImpl::shutdown();
}
void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+ cancelTimers();
MultiTopicsConsumerImpl::closeAsync(callback);
- autoDiscoveryTimer_->cancel();
+}
+
+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
+ boost::system::error_code ec;
+ autoDiscoveryTimer_->cancel(ec);
}
diff --git a/lib/PatternMultiTopicsConsumerImpl.h b/lib/PatternMultiTopicsConsumerImpl.h
index 408d68e..448f2e3 100644
--- a/lib/PatternMultiTopicsConsumerImpl.h
+++ b/lib/PatternMultiTopicsConsumerImpl.h
@@ -72,6 +72,7 @@ class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
bool autoDiscoveryRunning_;
NamespaceNamePtr namespaceName_;
+ void cancelTimers() noexcept;
void resetAutoDiscoveryTimer();
void timerGetTopicsOfNamespace(const Result result, const NamespaceTopicsPtr topics);
void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback callback);
diff --git a/lib/PeriodicTask.cc b/lib/PeriodicTask.cc
index 4e91ef5..65bdf23 100644
--- a/lib/PeriodicTask.cc
+++ b/lib/PeriodicTask.cc
@@ -38,12 +38,13 @@ void PeriodicTask::start() {
}
}
-void PeriodicTask::stop() {
+void PeriodicTask::stop() noexcept {
State state = Ready;
if (!state_.compare_exchange_strong(state, Closing)) {
return;
}
- timer_.cancel();
+ ErrorCode ec;
+ timer_.cancel(ec);
state_ = Pending;
}
diff --git a/lib/PeriodicTask.h b/lib/PeriodicTask.h
index 57d0734..159c86a 100644
--- a/lib/PeriodicTask.h
+++ b/lib/PeriodicTask.h
@@ -55,7 +55,7 @@ class PeriodicTask : public std::enable_shared_from_this<PeriodicTask> {
void start();
- void stop();
+ void stop() noexcept;
void setCallback(CallbackType callback) noexcept { callback_ = callback; }
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 20133c5..e228c83 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -109,7 +109,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
ProducerImpl::~ProducerImpl() {
LOG_DEBUG(getName() << "~ProducerImpl");
- cancelTimers();
+ shutdown();
printStats();
if (state_ == Ready || state_ == Pending) {
LOG_WARN(getName() << "Destroyed producer which was not properly closed");
@@ -124,6 +124,10 @@ int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished
const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
+void ProducerImpl::beforeConnectionChange(ClientConnection& connection) {
+ connection.removeProducer(producerId_);
+}
+
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
@@ -185,7 +189,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
- connection_ = cnx;
+ setCnx(cnx);
state_ = Ready;
backoff_.reset();
lock.unlock();
@@ -645,7 +649,19 @@ void ProducerImpl::printStats() {
}
}
-void ProducerImpl::closeAsync(CloseCallback callback) {
+void ProducerImpl::closeAsync(CloseCallback originalCallback) {
+ auto callback = [this, originalCallback](Result result) {
+ if (result == ResultOk) {
+ LOG_INFO(getName() << "Closed producer " << producerId_);
+ shutdown();
+ } else {
+ LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
+ }
+ if (originalCallback) {
+ originalCallback(result);
+ }
+ };
+
// if the producer was never started then there is nothing to clean up
State expectedState = NotStarted;
if (state_.compare_exchange_strong(expectedState, Closed)) {
@@ -653,9 +669,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
return;
}
- // Keep a reference to ensure object is kept alive
- ProducerImplPtr ptr = shared_from_this();
-
cancelTimers();
if (semaphore_) {
@@ -669,10 +682,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
// just like Java's `getAndUpdate` method on an atomic variable
const auto state = state_.load();
if (state != Ready && state != Pending) {
- state_ = Closed;
- if (callback) {
- callback(ResultAlreadyClosed);
- }
+ callback(ResultAlreadyClosed);
return;
}
@@ -681,53 +691,24 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
- state_ = Closed;
-
- if (callback) {
- callback(ResultOk);
- }
+ callback(ResultOk);
return;
}
// Detach the producer from the connection to avoid sending any other
// message from the producer
- connection_.reset();
+ resetCnx();
ClientImplPtr client = client_.lock();
if (!client) {
- state_ = Closed;
- // Client was already destroyed
- if (callback) {
- callback(ResultOk);
- }
+ callback(ResultOk);
return;
}
int requestId = client->newRequestId();
- Future<Result, ResponseData> future =
- cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
- if (callback) {
- // Pass the shared pointer "ptr" to the handler to prevent the object from being destroyed
- future.addListener(
- std::bind(&ProducerImpl::handleClose, shared_from_this(), std::placeholders::_1, callback, ptr));
- }
-}
-
-void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
- if (result == ResultOk) {
- state_ = Closed;
- LOG_INFO(getName() << "Closed producer " << producerId_);
- ClientConnectionPtr cnx = getCnx().lock();
- if (cnx) {
- cnx->removeProducer(producerId_);
- }
- } else {
- LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
- }
-
- if (callback) {
- callback(result);
- }
+ auto self = shared_from_this();
+ cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
+ .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
}
Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture() {
@@ -868,9 +849,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer
void ProducerImpl::disconnectProducer() {
LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
- Lock lock(mutex_);
- connection_.reset();
- lock.unlock();
+ resetCnx();
scheduleReconnection(shared_from_this());
}
@@ -885,16 +864,21 @@ void ProducerImpl::start() {
}
void ProducerImpl::shutdown() {
- Lock lock(mutex_);
- state_ = Closed;
+ resetCnx();
+ auto client = client_.lock();
+ if (client) {
+ client->cleanupProducer(this);
+ }
cancelTimers();
producerCreatedPromise_.setFailed(ResultAlreadyClosed);
+ state_ = Closed;
}
-void ProducerImpl::cancelTimers() {
+void ProducerImpl::cancelTimers() noexcept {
dataKeyRefreshTask_.stop();
- batchTimer_.cancel();
- sendTimer_.cancel();
+ boost::system::error_code ec;
+ batchTimer_.cancel(ec);
+ sendTimer_.cancel(ec);
}
bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 74eee61..0559515 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -109,6 +109,7 @@ class ProducerImpl : public HandlerBase,
friend class BatchMessageContainer;
// overrided methods from HandlerBase
+ void beforeConnectionChange(ClientConnection& connection) override;
void connectionOpened(const ClientConnectionPtr& connection) override;
void connectionFailed(Result result) override;
HandlerBaseWeakPtr get_weak_from_this() override { return shared_from_this(); }
@@ -120,8 +121,6 @@ class ProducerImpl : public HandlerBase,
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);
- void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);
-
void resendMessages(ClientConnectionPtr cnx);
void refreshEncryptionKey(const boost::system::error_code& ec);
@@ -143,7 +142,7 @@ class ProducerImpl : public HandlerBase,
void releaseSemaphore(uint32_t payloadSize);
void releaseSemaphoreForSendOp(const OpSendMsg& op);
- void cancelTimers();
+ void cancelTimers() noexcept;
bool isValidProducerState(const SendCallback& callback) const;
bool canAddToBatch(const Message& msg) const;
diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h
index 831d1e8..9bed7d7 100644
--- a/lib/SynchronizedHashMap.h
+++ b/lib/SynchronizedHashMap.h
@@ -74,12 +74,9 @@ class SynchronizedHashMap {
// clear the map and apply `f` on each removed value
void clear(std::function<void(const K&, const V&)> f) {
- Lock lock(mutex_);
- auto it = data_.begin();
- while (it != data_.end()) {
- f(it->first, it->second);
- auto next = data_.erase(it);
- it = next;
+ MapType data = move();
+ for (auto&& kv : data) {
+ f(kv.first, kv.second);
}
}
@@ -131,8 +128,15 @@ class SynchronizedHashMap {
return data_.size();
}
+ MapType move() noexcept {
+ Lock lock(mutex_);
+ MapType data;
+ data_.swap(data);
+ return data;
+ }
+
private:
- std::unordered_map<K, V> data_;
+ MapType data_;
// Use recursive_mutex to allow methods being called in `forEach`
mutable MutexType mutex_;
};
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index 216b548..aa48bdc 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -20,7 +20,6 @@
#include "HttpHelper.h"
#include "PulsarFriend.h"
-#include "WaitUtils.h"
#include <future>
#include <pulsar/Client.h>
@@ -198,37 +197,34 @@ TEST(ClientTest, testReferenceCount) {
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
ASSERT_EQ(producers.size(), 1);
- ASSERT_TRUE(producers[0].use_count() > 0);
- LOG_INFO("Reference count of the producer: " << producers[0].use_count());
+
+ producers.forEachValue([](const ProducerImplBaseWeakPtr &weakProducer) {
+ LOG_INFO("Reference count of producer: " << weakProducer.use_count());
+ ASSERT_FALSE(weakProducer.expired());
+ });
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer));
ASSERT_EQ(consumers.size(), 1);
- ASSERT_TRUE(consumers[0].use_count() > 0);
- LOG_INFO("Reference count of the consumer: " << consumers[0].use_count());
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk,
client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader));
ASSERT_EQ(consumers.size(), 2);
- ASSERT_TRUE(consumers[1].use_count() > 0);
- LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count());
+
+ consumers.forEachValue([](const ConsumerImplBaseWeakPtr &weakConsumer) {
+ LOG_INFO("Reference count of consumer: " << weakConsumer.use_count());
+ ASSERT_FALSE(weakConsumer.expired());
+ });
readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader);
ASSERT_TRUE(readerWeakPtr.use_count() > 0);
LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count());
}
- ASSERT_EQ(producers.size(), 1);
- ASSERT_EQ(producers[0].use_count(), 0);
- ASSERT_EQ(consumers.size(), 2);
-
- waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] {
- return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired();
- });
- EXPECT_EQ(consumers[0].use_count(), 0);
- EXPECT_EQ(consumers[1].use_count(), 0);
+ EXPECT_EQ(producers.size(), 0);
+ EXPECT_EQ(consumers.size(), 0);
EXPECT_EQ(readerWeakPtr.use_count(), 0);
client.close();
}
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index d9f9923..df8e3dc 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -98,14 +98,45 @@ class PulsarFriend {
static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { return client.impl_; }
- static ClientImpl::ProducersList& getProducers(const Client& client) {
+ static auto getProducers(const Client& client) -> decltype(ClientImpl::producers_)& {
return getClientImplPtr(client)->producers_;
}
- static ClientImpl::ConsumersList& getConsumers(const Client& client) {
+ static auto getConsumers(const Client& client) -> decltype(ClientImpl::consumers_)& {
return getClientImplPtr(client)->consumers_;
}
+ static std::vector<ClientConnectionPtr> getConnections(const Client& client) {
+ auto& pool = client.impl_->pool_;
+ std::vector<ClientConnectionPtr> connections;
+ std::lock_guard<std::mutex> lock(pool.mutex_);
+ for (const auto& kv : pool.pool_) {
+ auto cnx = kv.second.lock();
+ if (cnx) {
+ connections.emplace_back(cnx);
+ }
+ }
+ return connections;
+ }
+
+ static std::vector<ProducerImplPtr> getProducers(const ClientConnection& cnx) {
+ std::vector<ProducerImplPtr> producers;
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ for (const auto& kv : cnx.producers_) {
+ producers.emplace_back(kv.second.lock());
+ }
+ return producers;
+ }
+
+ static std::vector<ConsumerImplPtr> getConsumers(const ClientConnection& cnx) {
+ std::vector<ConsumerImplPtr> consumers;
+ std::lock_guard<std::mutex> lock(cnx.mutex_);
+ for (const auto& kv : cnx.consumers_) {
+ consumers.emplace_back(kv.second.lock());
+ }
+ return consumers;
+ }
+
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {
consumer.impl_->setNegativeAcknowledgeEnabledForTesting(enabled);
}
diff --git a/tests/ShutdownTest.cc b/tests/ShutdownTest.cc
new file mode 100644
index 0000000..e32a95c
--- /dev/null
+++ b/tests/ShutdownTest.cc
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <atomic>
+#include <ctime>
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include "lib/ClientImpl.h"
+#include "HttpHelper.h"
+#include "PulsarFriend.h"
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+enum class EndToEndType : uint8_t
+{
+ SINGLE_TOPIC,
+ MULTI_TOPICS,
+ REGEX_TOPICS
+};
+
+class ShutdownTest : public ::testing::TestWithParam<EndToEndType> {
+ public:
+ void SetUp() override {
+ topic_ = topic_ + std::to_string(id_++) + "-" + std::to_string(time(nullptr));
+ if (GetParam() != EndToEndType::SINGLE_TOPIC) {
+ int res = makePutRequest(
+ "http://localhost:8080/admin/v2/persistent/public/default/" + topic_ + "/partitions", "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+ }
+ }
+
+ protected:
+ Client client_{lookupUrl};
+ decltype(PulsarFriend::getProducers(client_)) producers_{PulsarFriend::getProducers(client_)};
+ decltype(PulsarFriend::getConsumers(client_)) consumers_{PulsarFriend::getConsumers(client_)};
+ std::string topic_ = "shutdown-test-";
+
+ static std::atomic_int id_;
+
+ Result subscribe(Consumer &consumer) {
+ if (GetParam() == EndToEndType::REGEX_TOPICS) {
+ // NOTE: Currently the regex subscription requires the complete namespace prefix
+ return client_.subscribeWithRegex("persistent://public/default/" + topic_ + ".*", "sub",
+ consumer);
+ } else {
+ return client_.subscribe(topic_, "sub", consumer);
+ }
+ }
+
+ void assertConnectionsEmpty() {
+ auto connections = PulsarFriend::getConnections(client_);
+ for (const auto &cnx : PulsarFriend::getConnections(client_)) {
+ EXPECT_TRUE(PulsarFriend::getProducers(*cnx).empty());
+ EXPECT_TRUE(PulsarFriend::getConsumers(*cnx).empty());
+ }
+ }
+};
+
+std::atomic_int ShutdownTest::id_{0};
+
+TEST_P(ShutdownTest, testClose) {
+ Producer producer;
+ ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+ EXPECT_EQ(producers_.size(), 1);
+ ASSERT_EQ(ResultOk, producer.close());
+ EXPECT_EQ(producers_.size(), 0);
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, subscribe(consumer));
+ EXPECT_EQ(consumers_.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.close());
+ EXPECT_EQ(consumers_.size(), 0);
+
+ ASSERT_EQ(ResultOk, subscribe(consumer));
+ EXPECT_EQ(consumers_.size(), 1);
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ EXPECT_EQ(consumers_.size(), 0);
+
+ assertConnectionsEmpty();
+ ASSERT_EQ(ResultOk, client_.close());
+}
+
+TEST_P(ShutdownTest, testDestructor) {
+ {
+ Producer producer;
+ ASSERT_EQ(ResultOk, client_.createProducer(topic_, producer));
+ EXPECT_EQ(producers_.size(), 1);
+ }
+ EXPECT_EQ(producers_.size(), 0);
+
+ {
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, subscribe(consumer));
+ EXPECT_EQ(consumers_.size(), 1);
+ }
+ EXPECT_EQ(consumers_.size(), 0);
+
+ assertConnectionsEmpty();
+ client_.close();
+}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ShutdownTest,
+ ::testing::Values(EndToEndType::SINGLE_TOPIC, EndToEndType::MULTI_TOPICS,
+ EndToEndType::REGEX_TOPICS));
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
deleted file mode 100644
index abe3efc..0000000
--- a/tests/WaitUtils.h
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#pragma once
-
-#include <chrono>
-#include <functional>
-#include <thread>
-
-namespace pulsar {
-
-template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) {
- auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
- while (timeoutMs > 0) {
- auto now = std::chrono::high_resolution_clock::now();
- if (condition()) {
- break;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::high_resolution_clock::now() - now)
- .count();
- timeoutMs -= elapsed;
- }
-}
-
-} // namespace pulsar