You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/03/04 22:47:54 UTC
[pulsar] branch master updated: Avoid debug noise by consumer
ack-tracker (#2953)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ffa2a68 Avoid debug noise by consumer ack-tracker (#2953)
ffa2a68 is described below
commit ffa2a688fdec9e4f2a9344434cee3ed03af79201
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Mar 4 14:47:49 2019 -0800
Avoid debug noise by consumer ack-tracker (#2953)
---
.../impl/PersistentAcknowledgmentsGroupingTracker.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9a87d26..457baf5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -148,11 +148,6 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
* Flush all the pending acks and send them to the broker
*/
public void flush() {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}", consumer,
- lastCumulativeAck, pendingIndividualAcks);
- }
-
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
@@ -162,10 +157,12 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
return;
}
+ boolean shouldFlush = false;
if (cumulativeAckFulshRequired) {
ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.ledgerId, lastCumulativeAck.entryId,
AckType.Cumulative, null, Collections.emptyMap());
cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+ shouldFlush=true;
cumulativeAckFulshRequired = false;
}
@@ -185,6 +182,7 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
cnx.ctx().write(Commands.newMultiMessageAck(consumer.consumerId, entriesToAck),
cnx.ctx().voidPromise());
+ shouldFlush = true;
} else {
// When talking to older brokers, send the acknowledgments individually
while (true) {
@@ -195,11 +193,18 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
cnx.ctx().write(Commands.newAck(consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(),
AckType.Individual, null, Collections.emptyMap()), cnx.ctx().voidPromise());
+ shouldFlush = true;
}
}
}
- cnx.ctx().flush();
+ if (shouldFlush) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}",
+ consumer, lastCumulativeAck, pendingIndividualAcks);
+ }
+ cnx.ctx().flush();
+ }
}
@Override