You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/03/04 22:47:54 UTC

[pulsar] branch master updated: Avoid debug noise by consumer ack-tracker (#2953)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ffa2a68  Avoid debug noise by consumer ack-tracker (#2953)
ffa2a68 is described below

commit ffa2a688fdec9e4f2a9344434cee3ed03af79201
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Mar 4 14:47:49 2019 -0800

    Avoid debug noise by consumer ack-tracker (#2953)
---
 .../impl/PersistentAcknowledgmentsGroupingTracker.java  | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9a87d26..457baf5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -148,11 +148,6 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
      * Flush all the pending acks and send them to the broker
      */
     public void flush() {
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}", consumer,
-                    lastCumulativeAck, pendingIndividualAcks);
-        }
-
         ClientCnx cnx = consumer.getClientCnx();
 
         if (cnx == null) {
@@ -162,10 +157,12 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
             return;
         }
 
+        boolean shouldFlush = false;
         if (cumulativeAckFulshRequired) {
             ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId,
                     AckType.Cumulative, null, Collections.emptyMap());
             cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+            shouldFlush=true;
             cumulativeAckFulshRequired = false;
         }
 
@@ -185,6 +182,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
                 cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck),
                         cnx.ctx().voidPromise());
+                shouldFlush = true;
             } else {
                 // When talking to older brokers, send the acknowledgments individually
                 while (true) {
@@ -195,11 +193,18 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
 
                     cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(),
                             AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
+                    shouldFlush = true;
                 }
             }
         }
 
-        cnx.ctx().flush();
+        if (shouldFlush) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}",
+                        consumer, lastCumulativeAck, pendingIndividualAcks);
+            }
+            cnx.ctx().flush();
+        }
     }
 
     @Override