You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 17:21:05 UTC

[pulsar] branch master updated: [pulsar-broker] fix msg-drop metrics for non-persistent topic (#4298)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 840b210  [pulsar-broker] fix msg-drop metrics for non-persistent topic (#4298)
840b210 is described below

commit 840b2100d16be4ce3d0f76035b5f1245fd543d7f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 10:20:59 2019 -0700

    [pulsar-broker] fix msg-drop metrics for non-persistent topic (#4298)
---
 .../nonpersistent/NonPersistentDispatcherMultipleConsumers.java       | 2 +-
 .../nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java    | 4 ++--
 .../broker/service/nonpersistent/NonPersistentSubscription.java       | 2 +-
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java       | 4 ++--
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2067a80..a85552f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -196,7 +196,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
             entries.forEach(entry -> {
                 int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
-                    msgDrop.recordEvent();
+                    msgDrop.recordEvent(totalMsgs);
                 }
                 entry.release();
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 787fb00..657e36f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -43,7 +43,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     private final Subscription subscription;
     private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
-
+    
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName());
@@ -63,7 +63,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
             entries.forEach(entry -> {
                 int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
                 if (totalMsgs > 0) {
-                    msgDrop.recordEvent();
+                    msgDrop.recordEvent(totalMsgs);
                 }
                 entry.release();
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 515a801..0139d14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -325,7 +325,7 @@ public class NonPersistentSubscription implements Subscription {
         }
 
         subStats.type = getType();
-        subStats.msgDropRate = dispatcher.getMesssageDropRate().getRate();
+        subStats.msgDropRate = dispatcher.getMesssageDropRate().getValueRate();
         return subStats;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e06e61c..b03feb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -773,7 +773,6 @@ public class NonPersistentTopic implements Topic {
 
                 topicStatsStream.startList("consumers");
 
-                subscription.getDispatcher().getMesssageDropRate().calculateRate();
                 for (Object consumerObj : consumers) {
                     Consumer consumer = (Consumer) consumerObj;
                     consumer.updateRates();
@@ -798,8 +797,9 @@ public class NonPersistentTopic implements Topic {
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
                 if (subscription.getDispatcher() != null) {
+                    subscription.getDispatcher().getMesssageDropRate().calculateRate();
                     topicStatsStream.writePair("msgDropRate",
-                            subscription.getDispatcher().getMesssageDropRate().getRate());
+                            subscription.getDispatcher().getMesssageDropRate().getValueRate());
                 }
 
                 // Close consumers