You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/04 14:17:38 UTC
[pulsar] branch master updated: Add log info for key-shared
subscribe mode (#4463)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4db4d53 Add log info for key-shared subscribe mode (#4463)
4db4d53 is described below
commit 4db4d532fb6d27f64243a652c715c1660b994102
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Jun 4 22:17:33 2019 +0800
Add log info for key-shared subscribe mode (#4463)
* Add log info for key-shared subscribe mode
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
.../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 9 +++++----
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 7 +++++++
2 files changed, 12 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9447af1..8472970 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1056,10 +1056,11 @@ public class ServerCnx extends PulsarHandler {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
-
- log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", remoteAddress,
- send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
- headersAndPayload.readableBytes());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}",
+ remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
+ headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey());
+ }
msgMetadata.recycle();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 154e870..0cbe354 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -100,6 +100,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
+ name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
+ }
if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
@@ -172,6 +176,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
+ if (log.isDebugEnabled()) {
+ log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
+ }
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
}