You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/10/23 10:39:23 UTC

[pulsar] branch master updated: Change state_ to closed when resultOk is returned (#5446)

This is an automated email from the ASF dual-hosted git repository.

aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 492d92b  Change state_ to closed when resultOk is returned (#5446)
492d92b is described below

commit 492d92bd2bc14f8a4eed22bc9208d86141dc1c95
Author: hrsakai <hs...@yahoo-corp.jp>
AuthorDate: Wed Oct 23 19:39:15 2019 +0900

    Change state_ to closed when resultOk is returned (#5446)
---
 pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +++++-
 pulsar-client-cpp/lib/ProducerImpl.cc | 6 +++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 1c616fe..1313a05 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -837,8 +837,12 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
+    LOG_INFO(getName() << "Closing consumer for topic " << topic_);
+    state_ = Closing;
+
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
+        state_ = Closed;
         lock.unlock();
         // If connection is gone, also the consumer is closed on the broker side
         if (callback) {
@@ -847,9 +851,9 @@ void ConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
 
-    LOG_INFO(getName() << "Closing consumer for topic " << topic_);
     ClientImplPtr client = client_.lock();
     if (!client) {
+        state_ = Closed;
         lock.unlock();
         // Client was already destroyed
         if (callback) {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 220a9f8..8159c8f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -492,6 +492,7 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (!cnx) {
+        state_ = Closed;
         lock.unlock();
         if (callback) {
             callback(ResultOk);
@@ -502,16 +503,19 @@ void ProducerImpl::closeAsync(CloseCallback callback) {
     // Detach the producer from the connection to avoid sending any other
     // message from the producer
     connection_.reset();
-    lock.unlock();
 
     ClientImplPtr client = client_.lock();
     if (!client) {
+        state_ = Closed;
+        lock.unlock();
         // Client was already destroyed
         if (callback) {
             callback(ResultOk);
         }
         return;
     }
+
+    lock.unlock();
     int requestId = client->newRequestId();
     Future<Result, ResponseData> future =
         cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId);