You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/28 06:25:51 UTC

[pulsar] branch master updated: [C++] Reduce redeliverMessages when message listener is enabled (#10726)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ca40005  [C++] Reduce redeliverMessages when message listener is enabled (#10726)
ca40005 is described below

commit ca4000544fc01895beb00ce5764945fbf7afebbf
Author: sijianliang <li...@foxmail.com>
AuthorDate: Fri May 28 14:24:59 2021 +0800

    [C++] Reduce redeliverMessages when message listener is enabled (#10726)
    
    ### Motivation
    
    When consumer's message listener is enabled, application acknowledge processed message in message listener callback, but still tracks the message in `internalListener`,  causes sending redundant redeliverMessages
    
    ### Modifications
    
    trackMessage before invoke message listener callback in `internalListener`
---
 pulsar-client-cpp/lib/ConsumerImpl.cc | 9 ++++++---
 pulsar-client-cpp/lib/ConsumerImpl.h  | 2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 564c114..c268417 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -566,6 +566,7 @@ void ConsumerImpl::internalListener() {
         // This will only happen when the connection got reset and we cleared the queue
         return;
     }
+    trackMessage(msg);
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -573,7 +574,7 @@ void ConsumerImpl::internalListener() {
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from listener" << e.what());
     }
-    messageProcessed(msg);
+    messageProcessed(msg, false);
 }
 
 Result ConsumerImpl::fetchSingleMessageFromBroker(Message& msg) {
@@ -701,7 +702,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
     }
 }
 
-void ConsumerImpl::messageProcessed(Message& msg) {
+void ConsumerImpl::messageProcessed(Message& msg, bool track) {
     Lock lock(mutex_);
     lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
 
@@ -712,7 +713,9 @@ void ConsumerImpl::messageProcessed(Message& msg) {
     }
 
     increaseAvailablePermits(currentCnx);
-    trackMessage(msg);
+    if (track) {
+        trackMessage(msg);
+    }
 }
 
 /**
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 4cd14a4..28f96dd 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -78,7 +78,7 @@ class ConsumerImpl : public ConsumerImplBase,
     uint64_t getConsumerId();
     void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg,
                          bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload);
-    void messageProcessed(Message& msg);
+    void messageProcessed(Message& msg, bool track = true);
     inline proto::CommandSubscribe_SubType getSubType();
     inline proto::CommandSubscribe_InitialPosition getInitialPosition();
     void handleUnsubscribe(Result result, ResultCallback callback);