You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/16 08:43:22 UTC
[pulsar] 01/02: [improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6e840ed11552e1f50c2d0214a6c90f204c437d6f
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Sat Aug 13 10:36:51 2022 +0800
[improve][client-c++] Use an atomic `state_` instead of the lock to improve performance (#16940)
### Motivation
Now, Use a lot of locks to ensure the atomicity of `state_` in the `ConsumerImpl`, `ProducerImpl`, `PartitionedConsumerImpl`, and `MultiTopicsConsumerImpl`, we can use atomic `state_` instead of the lock to improve performance.
### Modifications
Use an atomic `state_` instead of the lock to improve performance.
(cherry picked from commit fb0f653eadcf6bf72eb8c8efcc29975da6e21267)
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 75 ++++++----------------
pulsar-client-cpp/lib/HandlerBase.cc | 11 ++--
pulsar-client-cpp/lib/HandlerBase.h | 2 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 71 ++++++--------------
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 5 +-
.../lib/PatternMultiTopicsConsumerImpl.cc | 5 +-
pulsar-client-cpp/lib/ProducerImpl.cc | 59 ++++++-----------
7 files changed, 69 insertions(+), 159 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1f4a6b5b988..7c1c89d93f8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -157,10 +157,7 @@ void ConsumerImpl::start() {
}
void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
- Lock lock(mutex_);
- const auto state = state_;
- lock.unlock();
- if (state == Closed) {
+ if (state_ == Closed) {
LOG_DEBUG(getName() << "connectionOpened : Consumer is already closed");
return;
}
@@ -198,7 +195,6 @@ void ConsumerImpl::connectionFailed(Result result) {
ConsumerImplPtr ptr = shared_from_this();
if (consumerCreatedPromise_.setFailed(result)) {
- Lock lock(mutex_);
state_ = Failed;
}
}
@@ -271,15 +267,15 @@ void ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result r
void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
LOG_INFO(getName() << "Unsubscribing");
- Lock lock(mutex_);
if (state_ != Ready) {
- lock.unlock();
callback(ResultAlreadyClosed);
LOG_ERROR(getName() << "Can not unsubscribe a closed subscription, please call subscribe again and "
"then call unsubscribe");
return;
}
+ Lock lock(mutex_);
+
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
LOG_DEBUG(getName() << "Unsubscribe request sent for consumer - " << consumerId_);
@@ -300,7 +296,6 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback callback) {
void ConsumerImpl::handleUnsubscribe(Result result, ResultCallback callback) {
if (result == ResultOk) {
- Lock lock(mutex_);
state_ = Closed;
LOG_INFO(getName() << "Unsubscribed successfully");
} else {
@@ -630,12 +625,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
Message msg;
// fail the callback if consumer is closing or closed
- Lock stateLock(mutex_);
if (state_ != Ready) {
callback(ResultAlreadyClosed, msg);
return;
}
- stateLock.unlock();
Lock lock(pendingReceiveMutex_);
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -653,12 +646,10 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
}
Result ConsumerImpl::receiveHelper(Message& msg) {
- {
- Lock lock(mutex_);
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
+ if (state_ != Ready) {
+ return ResultAlreadyClosed;
}
+
if (messageListener_) {
LOG_ERROR(getName() << "Can not receive when a listener has been set");
return ResultInvalidConfiguration;
@@ -685,11 +676,8 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
return ResultInvalidConfiguration;
}
- {
- Lock lock(mutex_);
- if (state_ != Ready) {
- return ResultAlreadyClosed;
- }
+ if (state_ != Ready) {
+ return ResultAlreadyClosed;
}
if (messageListener_) {
@@ -864,13 +852,10 @@ void ConsumerImpl::disconnectConsumer() {
}
void ConsumerImpl::closeAsync(ResultCallback callback) {
- Lock lock(mutex_);
-
// Keep a reference to ensure object is kept alive
ConsumerImplPtr ptr = shared_from_this();
if (state_ != Ready) {
- lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
}
@@ -888,7 +873,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
- lock.unlock();
// If connection is gone, also the consumer is closed on the broker side
if (callback) {
callback(ResultOk);
@@ -899,7 +883,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
ClientImplPtr client = client_.lock();
if (!client) {
state_ = Closed;
- lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
@@ -907,8 +890,6 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- // Lock is no longer required
- lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId);
@@ -924,9 +905,7 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerImplPtr consumer) {
if (result == ResultOk) {
- Lock lock(mutex_);
state_ = Closed;
- lock.unlock();
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
@@ -946,22 +925,14 @@ void ConsumerImpl::handleClose(Result result, ResultCallback callback, ConsumerI
const std::string& ConsumerImpl::getName() const { return consumerStr_; }
void ConsumerImpl::shutdown() {
- Lock lock(mutex_);
state_ = Closed;
- lock.unlock();
consumerCreatedPromise_.setFailed(ResultAlreadyClosed);
}
-bool ConsumerImpl::isClosed() {
- Lock lock(mutex_);
- return state_ == Closed;
-}
+bool ConsumerImpl::isClosed() { return state_ == Closed; }
-bool ConsumerImpl::isOpen() {
- Lock lock(mutex_);
- return state_ == Ready;
-}
+bool ConsumerImpl::isOpen() { return state_ == Ready; }
Result ConsumerImpl::pauseMessageListener() {
if (!messageListener_) {
@@ -1024,14 +995,13 @@ void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); }
void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
- Lock lock(mutex_);
if (state_ != Ready) {
LOG_ERROR(getName() << "Client connection is not open, please try again later.")
- lock.unlock();
callback(ResultConsumerNotInitialized, BrokerConsumerStats());
return;
}
+ Lock lock(mutex_);
if (brokerConsumerStats_.isValid()) {
LOG_DEBUG(getName() << "Serving data from cache");
BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
@@ -1091,16 +1061,14 @@ void ConsumerImpl::handleSeek(Result result, ResultCallback callback) {
}
void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
- Lock lock(mutex_);
- if (state_ == Closed || state_ == Closing) {
- lock.unlock();
+ const auto state = state_.load();
+ if (state == Closed || state == Closing) {
LOG_ERROR(getName() << "Client connection already closed.");
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
- lock.unlock();
this->ackGroupingTrackerPtr_->flushAndClean();
ClientConnectionPtr cnx = getCnx().lock();
@@ -1124,16 +1092,14 @@ void ConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
}
void ConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
- Lock lock(mutex_);
- if (state_ == Closed || state_ == Closing) {
- lock.unlock();
+ const auto state = state_.load();
+ if (state == Closed || state == Closing) {
LOG_ERROR(getName() << "Client connection already closed.");
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
- lock.unlock();
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
@@ -1197,16 +1163,14 @@ void ConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback
}
void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback) {
- Lock lock(mutex_);
- if (state_ == Closed || state_ == Closing) {
- lock.unlock();
+ const auto state = state_.load();
+ if (state == Closed || state == Closing) {
LOG_ERROR(getName() << "Client connection already closed.");
if (callback) {
callback(ResultAlreadyClosed, MessageId());
}
return;
}
- lock.unlock();
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
@@ -1252,10 +1216,7 @@ void ConsumerImpl::trackMessage(const Message& msg) {
}
}
-bool ConsumerImpl::isConnected() const {
- Lock lock(mutex_);
- return !getCnx().expired() && state_ == Ready;
-}
+bool ConsumerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ? 1 : 0; }
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index d7025ad004b..5d2244f7552 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -43,12 +43,9 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
HandlerBase::~HandlerBase() { timer_->cancel(); }
void HandlerBase::start() {
- Lock lock(mutex_);
// guard against concurrent state changes such as closing
- if (state_ == NotStarted) {
- state_ = Pending;
- lock.unlock();
-
+ State state = NotStarted;
+ if (state_.compare_exchange_strong(state, Pending)) {
grabCnx();
}
}
@@ -97,7 +94,6 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
return;
}
- Lock lock(handler->mutex_);
State state = handler->state_;
ClientConnectionPtr currentConnection = handler->connection_.lock();
@@ -135,7 +131,8 @@ bool HandlerBase::isRetriableError(Result result) {
}
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
- if (handler->state_ == Pending || handler->state_ == Ready) {
+ const auto state = handler->state_.load();
+ if (state == Pending || state == Ready) {
TimeDuration delay = handler->backoff_.next();
LOG_INFO(handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0)
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index eeb8ebe1c5e..1184746da21 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -105,7 +105,7 @@ class HandlerBase {
Failed
};
- State state_;
+ std::atomic<State> state_;
Backoff backoff_;
uint64_t epoch_;
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 87166d0d360..d4b48681039 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -63,7 +63,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
void MultiTopicsConsumerImpl::start() {
if (topics_.empty()) {
- if (compareAndSetState(Pending, Ready)) {
+ MultiTopicsConsumerState state = Pending;
+ if (state_.compare_exchange_strong(state, Ready)) {
LOG_DEBUG("No topics passed in when create MultiTopicsConsumer.");
multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
return;
@@ -91,14 +92,15 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
(*topicsNeedCreate)--;
if (result != ResultOk) {
- setState(Failed);
+ state_ = Failed;
LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result);
}
LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
if (topicsNeedCreate->load() == 0) {
- if (compareAndSetState(Pending, Ready)) {
+ MultiTopicsConsumerState state = Pending;
+ if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
multiTopicsConsumerCreatedPromise_.setValue(shared_from_this());
} else {
@@ -122,7 +124,8 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
return topicPromise->getFuture();
}
- if (state_ == Closed || state_ == Closing) {
+ const auto state = state_.load();
+ if (state == Closed || state == Closing) {
LOG_ERROR("MultiTopicsConsumer already closed when subscribe.");
topicPromise->setFailed(ResultAlreadyClosed);
return topicPromise->getFuture();
@@ -238,15 +241,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {
LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing");
- Lock lock(mutex_);
- if (state_ == Closing || state_ == Closed) {
+ const auto state = state_.load();
+ if (state == Closing || state == Closed) {
LOG_INFO(consumerStr_ << " already closed");
- lock.unlock();
callback(ResultAlreadyClosed);
return;
}
state_ = Closing;
- lock.unlock();
std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
auto self = shared_from_this();
@@ -270,7 +271,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
(*consumerUnsubed)++;
if (result != ResultOk) {
- setState(Failed);
+ state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " subscription - " << subscriptionName_);
}
@@ -282,7 +283,7 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
unAckedMessageTrackerPtr_->clear();
Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError;
- setState(Closed);
+ state_ = Closed;
callback(result1);
return;
}
@@ -301,7 +302,8 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
int numberPartitions = it->second;
lock.unlock();
- if (state_ == Closing || state_ == Closed) {
+ const auto state = state_.load();
+ if (state == Closing || state == Closed) {
LOG_ERROR("TopicsConsumer already closed when unsubscribe topic: " << topic << " subscription - "
<< subscriptionName_);
callback(ResultAlreadyClosed);
@@ -337,7 +339,7 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
(*consumerUnsubed)++;
if (result != ResultOk) {
- setState(Failed);
+ state_ = Failed;
LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: "
<< result << " topicPartitionName - " << topicPartitionName);
}
@@ -369,7 +371,8 @@ void MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
}
void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
- if (state_ == Closing || state_ == Closed) {
+ const auto state = state_.load();
+ if (state == Closing || state == Closed) {
LOG_ERROR("TopicsConsumer already closed "
<< " topic" << topic_ << " consumer - " << consumerStr_);
if (callback) {
@@ -378,7 +381,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- setState(Closing);
+ state_ = Closing;
auto self = shared_from_this();
int numConsumers = 0;
@@ -392,7 +395,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
if (numConsumers == 0) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " << subscriptionName_);
- setState(Closed);
+ state_ = Closed;
if (callback) {
callback(ResultAlreadyClosed);
}
@@ -414,7 +417,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::stri
numberTopicPartitions_->fetch_sub(1);
if (result != ResultOk) {
- setState(Failed);
+ state_ = Failed;
LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - "
<< result);
}
@@ -476,18 +479,14 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
}
Result MultiTopicsConsumerImpl::receive(Message& msg) {
- Lock lock(mutex_);
if (state_ != Ready) {
- lock.unlock();
return ResultAlreadyClosed;
}
if (messageListener_) {
- lock.unlock();
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
- lock.unlock();
messages_.pop(msg);
unAckedMessageTrackerPtr_->add(msg.getMessageId());
@@ -495,19 +494,15 @@ Result MultiTopicsConsumerImpl::receive(Message& msg) {
}
Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
- Lock lock(mutex_);
if (state_ != Ready) {
- lock.unlock();
return ResultAlreadyClosed;
}
if (messageListener_) {
- lock.unlock();
LOG_ERROR("Can not receive when a listener has been set");
return ResultInvalidConfiguration;
}
- lock.unlock();
if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
@@ -520,12 +515,10 @@ void MultiTopicsConsumerImpl::receiveAsync(ReceiveCallback& callback) {
Message msg;
// fail the callback if consumer is closing or closed
- Lock stateLock(mutex_);
if (state_ != Ready) {
callback(ResultAlreadyClosed, msg);
return;
}
- stateLock.unlock();
Lock lock(pendingReceiveMutex_);
if (messages_.pop(msg, std::chrono::milliseconds(0))) {
@@ -590,30 +583,11 @@ const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic_; }
const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; }
-void MultiTopicsConsumerImpl::setState(const MultiTopicsConsumerState state) {
- Lock lock(mutex_);
- state_ = state;
-}
-
-bool MultiTopicsConsumerImpl::compareAndSetState(MultiTopicsConsumerState expect,
- MultiTopicsConsumerState update) {
- Lock lock(mutex_);
- if (state_ == expect) {
- state_ = update;
- return true;
- } else {
- return false;
- }
-}
-
void MultiTopicsConsumerImpl::shutdown() {}
bool MultiTopicsConsumerImpl::isClosed() { return state_ == Closed; }
-bool MultiTopicsConsumerImpl::isOpen() {
- Lock lock(mutex_);
- return state_ == Ready;
-}
+bool MultiTopicsConsumerImpl::isOpen() { return state_ == Ready; }
void MultiTopicsConsumerImpl::receiveMessages() {
const auto receiverQueueSize = conf_.getReceiverQueueSize();
@@ -663,12 +637,11 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<Mes
int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
void MultiTopicsConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
- Lock lock(mutex_);
if (state_ != Ready) {
- lock.unlock();
callback(ResultConsumerNotInitialized, BrokerConsumerStats());
return;
}
+ Lock lock(mutex_);
MultiTopicsBrokerConsumerStatsPtr statsPtr =
std::make_shared<MultiTopicsBrokerConsumerStatsImpl>(numberTopicPartitions_->load());
LatchPtr latchPtr = std::make_shared<Latch>(numberTopicPartitions_->load());
@@ -742,11 +715,9 @@ void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabl
}
bool MultiTopicsConsumerImpl::isConnected() const {
- Lock lock(mutex_);
if (state_ != Ready) {
return false;
}
- lock.unlock();
return consumers_
.findFirstValueIf([](const ConsumerImplPtr& consumer) { return !consumer->isConnected(); })
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index f69c56343e4..bfd92582c75 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -106,7 +106,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
- MultiTopicsConsumerState state_ = Pending;
+ std::atomic<MultiTopicsConsumerState> state_{Pending};
BlockingQueue<Message> messages_;
const ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
@@ -120,9 +120,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
std::queue<ReceiveCallback> pendingReceives_;
/* methods */
- void setState(MultiTopicsConsumerState state);
- bool compareAndSetState(MultiTopicsConsumerState expect, MultiTopicsConsumerState update);
-
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
unsigned int partitionIndex);
void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback);
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 8e92fd318d6..1183adfc4f1 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -55,8 +55,9 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
return;
}
- if (state_ != Ready) {
- LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state_);
+ const auto state = state_.load();
+ if (state != Ready) {
+ LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " << state);
resetAutoDiscoveryTimer();
return;
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index e15d388ef64..a446c2b4b25 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -136,13 +136,10 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
}
void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
- Lock lock(mutex_);
if (state_ == Closed) {
- lock.unlock();
LOG_DEBUG(getName() << "connectionOpened : Producer is already closed");
return;
}
- lock.unlock();
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
@@ -164,7 +161,6 @@ void ProducerImpl::connectionFailed(Result result) {
// so don't change the state and allow reconnections
return;
} else if (producerCreatedPromise_.setFailed(result)) {
- Lock lock(mutex_);
state_ = Failed;
}
}
@@ -175,14 +171,15 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
// make sure we're still in the Pending/Ready state, closeAsync could have been invoked
// while waiting for this response if using lazy producers
- Lock lock(mutex_);
- if (state_ != Ready && state_ != Pending) {
+ const auto state = state_.load();
+ if (state != Ready && state != Pending) {
LOG_DEBUG("Producer created response received but producer already closed");
failPendingMessages(ResultAlreadyClosed, false);
return;
}
if (result == ResultOk) {
+ Lock lock(mutex_);
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
@@ -217,8 +214,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
producerCreatedPromise_.setValue(shared_from_this());
} else {
- lock.unlock();
-
// Producer creation failed
if (result == ResultTimeout) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
@@ -248,7 +243,6 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
failPendingMessages(result, true);
producerCreatedPromise_.setFailed(result);
- Lock lock(mutex_);
state_ = Failed;
}
}
@@ -334,9 +328,8 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send
void ProducerImpl::flushAsync(FlushCallback callback) {
if (batchMessageContainer_) {
- Lock lock(mutex_);
-
if (state_ == Ready) {
+ Lock lock(mutex_);
auto failures = batchMessageAndSend(callback);
lock.unlock();
failures.complete();
@@ -350,8 +343,8 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
void ProducerImpl::triggerFlush() {
if (batchMessageContainer_) {
- Lock lock(mutex_);
if (state_ == Ready) {
+ Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
@@ -551,8 +544,9 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
LOG_DEBUG(getName() << " - Batch Message Timer expired");
// ignore if the producer is already closing/closed
- Lock lock(mutex_);
- if (state_ == Pending || state_ == Ready) {
+ const auto state = state_.load();
+ if (state == Pending || state == Ready) {
+ Lock lock(mutex_);
auto failures = batchMessageAndSend();
lock.unlock();
failures.complete();
@@ -569,11 +563,9 @@ void ProducerImpl::printStats() {
}
void ProducerImpl::closeAsync(CloseCallback callback) {
- Lock lock(mutex_);
-
// if the producer was never started then there is nothing to clean up
- if (state_ == NotStarted) {
- state_ = Closed;
+ State expectedState = NotStarted;
+ if (state_.compare_exchange_strong(expectedState, Closed)) {
callback(ResultOk);
return;
}
@@ -586,9 +578,11 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
// ensure any remaining send callbacks are called before calling the close callback
failPendingMessages(ResultAlreadyClosed, false);
- if (state_ != Ready && state_ != Pending) {
+ // TODO maybe we need a loop here to implement CAS for a condition,
+ // just like Java's `getAndUpdate` method on an atomic variable
+ const auto state = state_.load();
+ if (state != Ready && state != Pending) {
state_ = Closed;
- lock.unlock();
if (callback) {
callback(ResultAlreadyClosed);
}
@@ -601,7 +595,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
state_ = Closed;
- lock.unlock();
+
if (callback) {
callback(ResultOk);
}
@@ -615,7 +609,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
ClientImplPtr client = client_.lock();
if (!client) {
state_ = Closed;
- lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
@@ -623,7 +616,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
return;
}
- lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);
@@ -636,7 +628,6 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerImplPtr producer) {
if (result == ResultOk) {
- Lock lock(mutex_);
state_ = Closed;
LOG_INFO(getName() << "Closed producer");
ClientConnectionPtr cnx = getCnx().lock();
@@ -659,10 +650,11 @@ Future<Result, ProducerImplBaseWeakPtr> ProducerImpl::getProducerCreatedFuture()
uint64_t ProducerImpl::getProducerId() const { return producerId_; }
void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
- Lock lock(mutex_);
- if (state_ != Pending && state_ != Ready) {
+ const auto state = state_.load();
+ if (state != Pending && state != Ready) {
return;
}
+ Lock lock(mutex_);
if (err == boost::asio::error::operation_aborted) {
LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
@@ -837,22 +829,13 @@ bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr
return a->getProducerId() < b->getProducerId();
}
-bool ProducerImpl::isClosed() {
- Lock lock(mutex_);
- return state_ == Closed;
-}
+bool ProducerImpl::isClosed() { return state_ == Closed; }
-bool ProducerImpl::isConnected() const {
- Lock lock(mutex_);
- return !getCnx().expired() && state_ == Ready;
-}
+bool ProducerImpl::isConnected() const { return !getCnx().expired() && state_ == Ready; }
uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
-bool ProducerImpl::isStarted() const {
- Lock lock(mutex_);
- return state_ != NotStarted;
-}
+bool ProducerImpl::isStarted() const { return state_ != NotStarted; }
void ProducerImpl::startSendTimeoutTimer() {
// Initialize the sendTimer only once per producer and only when producer timeout is
// configured. Set the timeout as configured value and asynchronously wait for the