You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/17 18:08:16 UTC

[pulsar] branch master updated: Fix partition index error in close callback (#7282)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 72285f2  Fix partition index error in close callback (#7282)
72285f2 is described below

commit 72285f27755f961b61eb4d1eca891a6979044f50
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Thu Jun 18 02:07:59 2020 +0800

    Fix partition index error in close callback (#7282)
---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 14 +++++++-------
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 16 ++++++++--------
 pulsar-client-cpp/lib/ProducerImpl.h             |  2 ++
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 7a7e16f..6f5dbd2 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -292,7 +292,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned consumers
         closeAsync(nullCallbackForCleanup);
-        LOG_DEBUG("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
         return;
     }
 
@@ -351,17 +351,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
     setState(Closed);
-    int consumerIndex = 0;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
     // when `state_` is Ready
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        ConsumerImplPtr consumer = *i;
+    for (auto& consumer : consumers_) {
         if (!consumer->isClosed()) {
-            consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose,
-                                           shared_from_this(), std::placeholders::_1, consumerIndex,
-                                           callback));
+            auto self = shared_from_this();
+            const auto partition = consumer->getPartitionIndex();
+            consumer->closeAsync([this, self, partition, callback](Result result) {
+                handleSinglePartitionConsumerClose(result, partition, callback);
+            });
         } else {
             if (++consumerAlreadyClosed == consumers_.size()) {
                 // everything is closed already. so we are good.
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 628afbc..aa5e176 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -131,7 +131,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         lock.unlock();
         closeAsync(closeCallback);
         partitionedProducerCreatedPromise_.setFailed(result);
-        LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
         return;
     }
 
@@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
     setState(Closing);
 
-    int producerIndex = 0;
     unsigned int producerAlreadyClosed = 0;
 
     // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
     // when `state_` is Ready
-    for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
-        ProducerImplPtr prod = *i;
-        if (!prod->isClosed()) {
-            prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
-                                       shared_from_this(), std::placeholders::_1, producerIndex,
-                                       closeCallback));
+    for (auto& producer : producers_) {
+        if (!producer->isClosed()) {
+            auto self = shared_from_this();
+            const auto partition = static_cast<unsigned int>(producer->partition());
+            producer->closeAsync([this, self, partition, closeCallback](Result result) {
+                handleSinglePartitionProducerClose(result, partition, closeCallback);
+            });
         } else {
             producerAlreadyClosed++;
         }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 0a57da8d..25f628c 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase,
 
     uint64_t getProducerId() const;
 
+    int32_t partition() const noexcept { return partition_; }
+
     virtual void start();
 
     virtual void shutdown();