You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 17:21:05 UTC
[pulsar] branch master updated: [pulsar-broker] fix msg-drop
metrics for non-persistent topic (#4298)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 840b210 [pulsar-broker] fix msg-drop metrics for non-persistent topic (#4298)
840b210 is described below
commit 840b2100d16be4ce3d0f76035b5f1245fd543d7f
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 10:20:59 2019 -0700
[pulsar-broker] fix msg-drop metrics for non-persistent topic (#4298)
---
.../nonpersistent/NonPersistentDispatcherMultipleConsumers.java | 2 +-
.../nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java | 4 ++--
.../broker/service/nonpersistent/NonPersistentSubscription.java | 2 +-
.../pulsar/broker/service/nonpersistent/NonPersistentTopic.java | 4 ++--
4 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 2067a80..a85552f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -196,7 +196,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
if (totalMsgs > 0) {
- msgDrop.recordEvent();
+ msgDrop.recordEvent(totalMsgs);
}
entry.release();
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 787fb00..657e36f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -43,7 +43,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;
-
+
public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName());
@@ -63,7 +63,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
entries.forEach(entry -> {
int totalMsgs = getBatchSizeforEntry(entry.getDataBuffer(), subscription, -1);
if (totalMsgs > 0) {
- msgDrop.recordEvent();
+ msgDrop.recordEvent(totalMsgs);
}
entry.release();
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 515a801..0139d14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -325,7 +325,7 @@ public class NonPersistentSubscription implements Subscription {
}
subStats.type = getType();
- subStats.msgDropRate = dispatcher.getMesssageDropRate().getRate();
+ subStats.msgDropRate = dispatcher.getMesssageDropRate().getValueRate();
return subStats;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index e06e61c..b03feb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -773,7 +773,6 @@ public class NonPersistentTopic implements Topic {
topicStatsStream.startList("consumers");
- subscription.getDispatcher().getMesssageDropRate().calculateRate();
for (Object consumerObj : consumers) {
Consumer consumer = (Consumer) consumerObj;
consumer.updateRates();
@@ -798,8 +797,9 @@ public class NonPersistentTopic implements Topic {
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("type", subscription.getTypeString());
if (subscription.getDispatcher() != null) {
+ subscription.getDispatcher().getMesssageDropRate().calculateRate();
topicStatsStream.writePair("msgDropRate",
- subscription.getDispatcher().getMesssageDropRate().getRate());
+ subscription.getDispatcher().getMesssageDropRate().getValueRate());
}
// Close consumers