You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/17 18:08:16 UTC
[pulsar] branch master updated: Fix partition index error in close
callback (#7282)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 72285f2 Fix partition index error in close callback (#7282)
72285f2 is described below
commit 72285f27755f961b61eb4d1eca891a6979044f50
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Thu Jun 18 02:07:59 2020 +0800
Fix partition index error in close callback (#7282)
---
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 14 +++++++-------
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 16 ++++++++--------
pulsar-client-cpp/lib/ProducerImpl.h | 2 ++
3 files changed, 17 insertions(+), 15 deletions(-)
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 7a7e16f..6f5dbd2 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -292,7 +292,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
partitionedConsumerCreatedPromise_.setFailed(result);
// unsubscribed all of the successfully subscribed partitioned consumers
closeAsync(nullCallbackForCleanup);
- LOG_DEBUG("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
+ LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
return;
}
@@ -351,17 +351,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
return;
}
setState(Closed);
- int consumerIndex = 0;
unsigned int consumerAlreadyClosed = 0;
// close successfully subscribed consumers
// Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
// when `state_` is Ready
- for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
- ConsumerImplPtr consumer = *i;
+ for (auto& consumer : consumers_) {
if (!consumer->isClosed()) {
- consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose,
- shared_from_this(), std::placeholders::_1, consumerIndex,
- callback));
+ auto self = shared_from_this();
+ const auto partition = consumer->getPartitionIndex();
+ consumer->closeAsync([this, self, partition, callback](Result result) {
+ handleSinglePartitionConsumerClose(result, partition, callback);
+ });
} else {
if (++consumerAlreadyClosed == consumers_.size()) {
// everything is closed already. so we are good.
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 628afbc..aa5e176 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -131,7 +131,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
lock.unlock();
closeAsync(closeCallback);
partitionedProducerCreatedPromise_.setFailed(result);
- LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
+ LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
return;
}
@@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
setState(Closing);
- int producerIndex = 0;
unsigned int producerAlreadyClosed = 0;
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
- for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
- ProducerImplPtr prod = *i;
- if (!prod->isClosed()) {
- prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
- shared_from_this(), std::placeholders::_1, producerIndex,
- closeCallback));
+ for (auto& producer : producers_) {
+ if (!producer->isClosed()) {
+ auto self = shared_from_this();
+ const auto partition = static_cast<unsigned int>(producer->partition());
+ producer->closeAsync([this, self, partition, closeCallback](Result result) {
+ handleSinglePartitionProducerClose(result, partition, closeCallback);
+ });
} else {
producerAlreadyClosed++;
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 0a57da8d..25f628c 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase,
uint64_t getProducerId() const;
+ int32_t partition() const noexcept { return partition_; }
+
virtual void start();
virtual void shutdown();