You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/28 06:25:51 UTC
[pulsar] branch master updated: [C++] Reduce redeliverMessages when
message listener is enabled (#10726)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ca40005 [C++] Reduce redeliverMessages when message listener is enabled (#10726)
ca40005 is described below
commit ca4000544fc01895beb00ce5764945fbf7afebbf
Author: sijianliang <li...@foxmail.com>
AuthorDate: Fri May 28 14:24:59 2021 +0800
[C++] Reduce redeliverMessages when message listener is enabled (#10726)
### Motivation
When consumer's message listener is enabled, application acknowledge processed message in message listener callback, but still tracks the message in `internalListener`, causes sending redundant redeliverMessages
### Modifications
trackMessage before invoke message listener callback in `internalListener`
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 9 ++++++---
pulsar-client-cpp/lib/ConsumerImpl.h | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 564c114..c268417 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -566,6 +566,7 @@ void ConsumerImpl::internalListener() {
// This will only happen when the connection got reset and we cleared the queue
return;
}
+ trackMessage(msg);
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -573,7 +574,7 @@ void ConsumerImpl::internalListener() {
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
}
- messageProcessed(msg);
+ messageProcessed(msg, false);
}
Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
@@ -701,7 +702,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
}
}
-void ConsumerImpl::messageProcessed(Message& msg) {
+void ConsumerImpl::messageProcessed(Message& msg, bool track) {
Lock lock(mutex_);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -712,7 +713,9 @@ void ConsumerImpl::messageProcessed(Message& msg) {
}
increaseAvailablePermits(currentCnx);
- trackMessage(msg);
+ if (track) {
+ trackMessage(msg);
+ }
}
/**
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 4cd14a4..28f96dd 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -78,7 +78,7 @@ class ConsumerImpl : public ConsumerImplBase,
uint64_t getConsumerId();
void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
- void messageProcessed(Message& msg);
+ void messageProcessed(Message& msg, bool track = true);
inline proto::CommandSubscribe_SubType getSubType();
inline proto::CommandSubscribe_InitialPosition getInitialPosition();
void handleUnsubscribe(Result result, ResultCallback callback);