You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/10/23 10:39:23 UTC
[pulsar] branch master updated: Change state_ to closed when
resultOk is returned (#5446)
This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 492d92b Change state_ to closed when resultOk is returned (#5446)
492d92b is described below
commit 492d92bd2bc14f8a4eed22bc9208d86141dc1c95
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Wed Oct 23 19:39:15 2019 +0900
Change state_ to closed when resultOk is returned (#5446)
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +++++-
pulsar-client-cpp/lib/ProducerImpl.cc | 6 +++++-
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1c616fe..1313a05 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -837,8 +837,12 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
+ LOG_INFO(getName() << "Closing consumer for topic " << topic_);
+ state_ = Closing;
+
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
+ state_ = Closed;
lock.unlock();
// If connection is gone, also the consumer is closed on the broker side
if (callback) {
@@ -847,9 +851,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
- LOG_INFO(getName() << "Closing consumer for topic " << topic_);
ClientImplPtr client = client_.lock();
if (!client) {
+ state_ = Closed;
lock.unlock();
// Client was already destroyed
if (callback) {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 220a9f8..8159c8f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -492,6 +492,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (!cnx) {
+ state_ = Closed;
lock.unlock();
if (callback) {
callback(ResultOk);
@@ -502,16 +503,19 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
// Detach the producer from the connection to avoid sending any other
// message from the producer
connection_.reset();
- lock.unlock();
ClientImplPtr client = client_.lock();
if (!client) {
+ state_ = Closed;
+ lock.unlock();
// Client was already destroyed
if (callback) {
callback(ResultOk);
}
return;
}
+
+ lock.unlock();
int requestId = client->newRequestId();
Future<Result, ResponseData> future =
cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);