You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/19 09:50:07 UTC

[pulsar] branch branch-2.7 updated: Issue #6054 - Adding more permits debug statements to better see changes to permits (#10217)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new c243d18  Issue #6054 - Adding more permits debug statements to better see changes to permits (#10217)
c243d18 is described below

commit c243d185261e964010a122b65f9c730bffbb2f0c
Author: Devin Bost <de...@users.noreply.github.com>
AuthorDate: Mon Apr 19 03:28:47 2021 -0600

    Issue #6054 - Adding more permits debug statements to better see changes to permits (#10217)
---
 docker/build.sh                                    |  2 +-
 .../org/apache/pulsar/broker/service/Consumer.java | 26 ++++++++++++++++++++++
 .../broker/service/PulsarCommandSenderImpl.java    |  4 ++--
 .../PersistentDispatcherMultipleConsumers.java     | 23 ++++++++++++++-----
 4 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/docker/build.sh b/docker/build.sh
index 944c35b..1379790 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -22,4 +22,4 @@ ROOT_DIR=$(git rev-parse --show-toplevel)
 cd $ROOT_DIR/docker
 
 mvn -f ../dashboard/pom.xml package -Pdocker
-mvn package -Pdocker 
+mvn package -Pdocker
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 263d722..475fdc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -237,6 +237,11 @@ public class Consumer {
                 if (entry != null) {
                     int batchSize = batchSizes.getBatchSize(i);
                     pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
+                    if (log.isDebugEnabled()){
+                        log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in"
+                                        + " broker.service.Consumer for consumerId: {}",
+                             topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, consumerId);
+                    }
                 }
             }
         }
@@ -250,6 +255,11 @@ public class Consumer {
         // reduce permit and increment unackedMsg count with total number of messages in batch-msgs
         int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
         MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
+        if (log.isDebugEnabled()){
+            log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
+                            + " for consumerId: {}; avgMessagesPerEntry is {}",
+                   topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry);
+        }
         incrementUnackedMessages(totalMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
         msgOutCounter.add(totalMessages);
@@ -506,6 +516,10 @@ public class Consumer {
         int oldPermits;
         if (!blockedConsumerOnUnackedMsgs) {
             oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
+            if (log.isDebugEnabled()) {
+                log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher "
+                        + "for consumer", topicName, subscription, additionalNumberOfMessages, consumerId);
+            }
             subscription.consumerFlow(this, additionalNumberOfMessages);
         } else {
             oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
@@ -529,6 +543,10 @@ public class Consumer {
         int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
         // add newly flow permits to actual consumer.messagePermits
         MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
+        if (log.isDebugEnabled()){
+            log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer", topicName,
+                    subscription, additionalNumberOfPermits, consumerId);
+        }
         // dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer
         subscription.consumerFlow(consumer, additionalNumberOfPermits);
     }
@@ -577,6 +595,10 @@ public class Consumer {
         lastAckedTimestamp = consumerStats.lastAckedTimestamp;
         lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
         MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
+        if (log.isDebugEnabled()){
+            log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer", topicName,
+                    subscription, consumerStats.availablePermits, consumerId);
+        }
         unackedMessages = consumerStats.unackedMessages;
         blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
         AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
@@ -754,6 +776,10 @@ public class Consumer {
         // if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
         if (numberOfBlockedPermits > 0) {
             MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
+            if (log.isDebugEnabled()) {
+               log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}",
+                       topicName, subscription, numberOfBlockedPermits, consumerId);
+            }
             subscription.consumerFlow(this, numberOfBlockedPermits);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 41705e9..dcc8458 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -284,8 +284,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
                 }
 
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
-                            consumerId, entry.getLedgerId(), entry.getEntryId());
+                    log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{} with batchSize {}",
+                            topicName, subscription,  consumerId, entry.getLedgerId(), entry.getEntryId(), batchSize);
                 }
 
                 int redeliveryCount = 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index af22fa5..c2dc7d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -213,6 +213,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
                 });
                 totalAvailablePermits -= consumer.getAvailablePermits();
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+                                    + "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
+                            totalAvailablePermits);
+                }
                 readMoreEntries();
             }
         } else {
@@ -232,8 +237,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         totalAvailablePermits += additionalNumberOfMessages;
 
         if (log.isDebugEnabled()) {
-            log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
-                    totalAvailablePermits);
+            log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {} "
+                            + "after adding {} permits", name, consumer,
+                    totalAvailablePermits, additionalNumberOfMessages);
         }
         readMoreEntries();
     }
@@ -494,8 +500,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             // round-robin dispatch batch size for this consumer
             int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
             if (log.isDebugEnabled() && !c.isWritable()) {
-                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
-                        c);
+                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
+                                + "availablePermits are {}", topic.getName(), name,
+                        c, availablePermits);
             }
             int messagesForC = Math.min(
                     Math.min(entriesToDispatch, availablePermits),
@@ -524,7 +531,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 int msgSent = sendMessageInfo.getTotalMessages();
                 start += messagesForC;
                 entriesToDispatch -= messagesForC;
-                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+                TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
+                        -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+                if (log.isDebugEnabled()){
+                    log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+                                    + "PersistentDispatcherMultipleConsumers",
+                            name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
+                }
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
             }