You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/15 10:05:42 UTC
[pulsar] branch master updated: [pulsar-storm] add more metrics to
troubleshoot spout throughput (#4280)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c09ff2 [pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
2c09ff2 is described below
commit 2c09ff2f8e519b96a34eda391b450133b68cd5aa
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed May 15 03:05:37 2019 -0700
[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
### Motivation
Many time user sees lower throughput in pulsar-spout even though standalone consumer can consume such msgRate easily. It would be hard to debug user's topology without enough information so, adding two metrics which can impact spout throughput.
- number of message filed: spout sleeps when it sees failed message so, it's important to have visibility of that count
- number of times spout-thread not found the message in queue: spout topology internally sleeps if it doesn't see any emitted tuple in collector after triggering `nextTuple()` api.
This metrics gives more visibility about consumer throughput.
---
.../src/main/java/org/apache/pulsar/storm/PulsarSpout.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index bbfe5cd..5a5ea59 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -58,6 +58,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
public static final String NO_OF_PENDING_FAILED_MESSAGES = "numberOfPendingFailedMessages";
public static final String NO_OF_MESSAGES_RECEIVED = "numberOfMessagesReceived";
public static final String NO_OF_MESSAGES_EMITTED = "numberOfMessagesEmitted";
+ public static final String NO_OF_MESSAGES_FAILED = "numberOfMessagesFailed";
+ public static final String MESSAGE_NOT_AVAILABLE_COUNT = "messageNotAvailableCount";
public static final String NO_OF_PENDING_ACKS = "numberOfPendingAcks";
public static final String CONSUMER_RATE = "consumerRate";
public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";
@@ -78,6 +80,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
private Consumer<byte[]> consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
+ private volatile long messagesFailed = 0;
+ private volatile long messageNotAvailableCount = 0;
private volatile long pendingAcks = 0;
private volatile long messageSizeReceived = 0;
@@ -157,7 +161,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
pendingMessageRetries.putIfAbsent(id, messageRetries);
failedMessages.add(msg);
--pendingAcks;
-
+ messagesFailed++;
} else {
LOG.warn("[{}] Number of retries limit reached, dropping the message {}", spoutId, id);
ack(msg);
@@ -203,6 +207,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
} else {
// queue is empty and nothing to emit
done = true;
+ messageNotAvailableCount++;
}
}
} catch (PulsarClientException e) {
@@ -334,6 +339,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
metricsMap.put(NO_OF_PENDING_FAILED_MESSAGES, (long) pendingMessageRetries.size());
metricsMap.put(NO_OF_MESSAGES_RECEIVED, messagesReceived);
metricsMap.put(NO_OF_MESSAGES_EMITTED, messagesEmitted);
+ metricsMap.put(NO_OF_MESSAGES_FAILED, messagesFailed);
+ metricsMap.put(MESSAGE_NOT_AVAILABLE_COUNT, messageNotAvailableCount);
metricsMap.put(NO_OF_PENDING_ACKS, pendingAcks);
metricsMap.put(CONSUMER_RATE, ((double) messagesReceived) / pulsarSpoutConf.getMetricsTimeIntervalInSecs());
metricsMap.put(CONSUMER_THROUGHPUT_BYTES,
@@ -345,6 +352,8 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
messagesReceived = 0;
messagesEmitted = 0;
messageSizeReceived = 0;
+ messagesFailed = 0;
+ messageNotAvailableCount = 0;
}
@SuppressWarnings("rawtypes")