You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/04/19 09:50:07 UTC
[pulsar] branch branch-2.7 updated: Issue #6054 - Adding more
permits debug statements to better see changes to permits (#10217)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new c243d18 Issue #6054 - Adding more permits debug statements to better see changes to permits (#10217)
c243d18 is described below
commit c243d185261e964010a122b65f9c730bffbb2f0c
Author: Devin Bost <de...@users.noreply.github.com>
AuthorDate: Mon Apr 19 03:28:47 2021 -0600
Issue #6054 - Adding more permits debug statements to better see changes to permits (#10217)
---
docker/build.sh | 2 +-
.../org/apache/pulsar/broker/service/Consumer.java | 26 ++++++++++++++++++++++
.../broker/service/PulsarCommandSenderImpl.java | 4 ++--
.../PersistentDispatcherMultipleConsumers.java | 23 ++++++++++++++-----
4 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/docker/build.sh b/docker/build.sh
index 944c35b..1379790 100755
--- a/docker/build.sh
+++ b/docker/build.sh
@@ -22,4 +22,4 @@ ROOT_DIR=$(git rev-parse --show-toplevel)
cd $ROOT_DIR/docker
mvn -f ../dashboard/pom.xml package -Pdocker
-mvn package -Pdocker
+mvn package -Pdocker
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 263d722..475fdc9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -237,6 +237,11 @@ public class Consumer {
if (entry != null) {
int batchSize = batchSizes.getBatchSize(i);
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in"
+ + " broker.service.Consumer for consumerId: {}",
+ topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, consumerId);
+ }
}
}
}
@@ -250,6 +255,11 @@ public class Consumer {
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount();
MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
+ + " for consumerId: {}; avgMessagesPerEntry is {}",
+ topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry);
+ }
incrementUnackedMessages(totalMessages);
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
@@ -506,6 +516,10 @@ public class Consumer {
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher "
+ + "for consumer", topicName, subscription, additionalNumberOfMessages, consumerId);
+ }
subscription.consumerFlow(this, additionalNumberOfMessages);
} else {
oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages);
@@ -529,6 +543,10 @@ public class Consumer {
int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0);
// add newly flow permits to actual consumer.messagePermits
MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer", topicName,
+ subscription, additionalNumberOfPermits, consumerId);
+ }
// dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer
subscription.consumerFlow(consumer, additionalNumberOfPermits);
}
@@ -577,6 +595,10 @@ public class Consumer {
lastAckedTimestamp = consumerStats.lastAckedTimestamp;
lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
+ if (log.isDebugEnabled()){
+ log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer", topicName,
+ subscription, consumerStats.availablePermits, consumerId);
+ }
unackedMessages = consumerStats.unackedMessages;
blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs;
AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
@@ -754,6 +776,10 @@ public class Consumer {
// if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages
if (numberOfBlockedPermits > 0) {
MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}",
+ topicName, subscription, numberOfBlockedPermits, consumerId);
+ }
subscription.consumerFlow(this, numberOfBlockedPermits);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 41705e9..dcc8458 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -284,8 +284,8 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
}
if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{}", topicName, subscription,
- consumerId, entry.getLedgerId(), entry.getEntryId());
+ log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{} with batchSize {}",
+ topicName, subscription, consumerId, entry.getLedgerId(), entry.getEntryId(), batchSize);
}
int redeliveryCount = 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index af22fa5..c2dc7d7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -213,6 +213,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
});
totalAvailablePermits -= consumer.getAvailablePermits();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+ + "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
+ totalAvailablePermits);
+ }
readMoreEntries();
}
} else {
@@ -232,8 +237,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
totalAvailablePermits += additionalNumberOfMessages;
if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
- totalAvailablePermits);
+ log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {} "
+ + "after adding {} permits", name, consumer,
+ totalAvailablePermits, additionalNumberOfMessages);
}
readMoreEntries();
}
@@ -494,8 +500,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (log.isDebugEnabled() && !c.isWritable()) {
- log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
- c);
+ log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; "
+ + "availablePermits are {}", topic.getName(), name,
+ c, availablePermits);
}
int messagesForC = Math.min(
Math.min(entriesToDispatch, availablePermits),
@@ -524,7 +531,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
int msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
entriesToDispatch -= messagesForC;
- TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+ TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
+ -(msgSent - batchIndexesAcks.getTotalAckedIndexCount()));
+ if (log.isDebugEnabled()){
+ log.debug("[{}] Added -({} minus {}) permits to TOTAL_AVAILABLE_PERMITS_UPDATER in "
+ + "PersistentDispatcherMultipleConsumers",
+ name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
+ }
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}