You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/04 14:17:38 UTC

[pulsar] branch master updated: Add log info for key-shared subscribe mode (#4463)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db4d53  Add log info for key-shared subscribe mode (#4463)
4db4d53 is described below

commit 4db4d532fb6d27f64243a652c715c1660b994102
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Tue Jun 4 22:17:33 2019 +0800

    Add log info for key-shared subscribe mode (#4463)
    
    * Add log info for key-shared subscribe mode
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java    | 9 +++++----
 .../PersistentStickyKeyDispatcherMultipleConsumers.java          | 7 +++++++
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 9447af1..8472970 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1056,10 +1056,11 @@ public class ServerCnx extends PulsarHandler {
         headersAndPayload.markReaderIndex();
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
         headersAndPayload.resetReaderIndex();
-
-        log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", remoteAddress,
-                send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
-                headersAndPayload.readableBytes());
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}",
+                    remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(),
+                    headersAndPayload.readableBytes(), msgMetadata.getPartitionKey(), msgMetadata.getOrderingKey());
+        }
         msgMetadata.recycle();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 154e870..0cbe354 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -100,6 +100,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 }
 
                 int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
+                            name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
+                }
                 if (messagesForC > 0) {
                     // remove positions first from replay list first : sendMessages recycles entries
                     List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
@@ -172,6 +176,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
         metadataAndPayload.resetReaderIndex();
         String key = metadata.getPartitionKey();
+        if (log.isDebugEnabled()) {
+            log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
+        }
         if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
             return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
         }