You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/15 10:05:42 UTC

[pulsar] branch master updated: [pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c09ff2  [pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
2c09ff2 is described below

commit 2c09ff2f8e519b96a34eda391b450133b68cd5aa
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed May 15 03:05:37 2019 -0700

    [pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
    
    ### Motivation
    
    Many time user sees lower throughput in pulsar-spout even though standalone consumer can consume such msgRate easily. It would be hard to debug user's topology without enough information so, adding two metrics which can impact spout throughput.
    - number of message filed: spout sleeps when it sees failed message so, it's important to have visibility of that count
    - number of times spout-thread not found the message in queue: spout topology internally sleeps if it doesn't see any emitted tuple in collector after triggering `nextTuple()` api.
    
    This metrics gives more visibility about consumer throughput.
---
 .../src/main/java/org/apache/pulsar/storm/PulsarSpout.java    | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index bbfe5cd..5a5ea59 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -58,6 +58,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
     public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
     public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
+    public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
+    public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
     public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
     public static final String CONSUMER_RATE = "consumerRate";
     public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
@@ -78,6 +80,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
     private Consumer<byte[]> consumer;
     private volatile long messagesReceived = 0;
     private volatile long messagesEmitted = 0;
+    private volatile long messagesFailed = 0;
+    private volatile long messageNotAvailableCount = 0;
     private volatile long pendingAcks = 0;
     private volatile long messageSizeReceived = 0;
 
@@ -157,7 +161,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
                 pendingMessageRetries.putIfAbsent(id, messageRetries);
                 failedMessages.add(msg);
                 --pendingAcks;
-
+                messagesFailed++;
             } else {
                 LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
                 ack(msg);
@@ -203,6 +207,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
                     } else {
                         // queue is empty and nothing to emit
                         done = true;
+                        messageNotAvailableCount++;
                     }
                 }
             } catch (PulsarClientException e) {
@@ -334,6 +339,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
         metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
         metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+        metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+        metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
         metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
         metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
         metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
@@ -345,6 +352,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
         messagesReceived = 0;
         messagesEmitted = 0;
         messageSizeReceived = 0;
+        messagesFailed = 0;
+        messageNotAvailableCount = 0;
     }
 
     @SuppressWarnings("rawtypes")