You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/03/08 19:22:40 UTC

[GitHub] [storm] jmpoholarz opened a new pull request #3385: Add Metrics for Trident Kafka Spout/Emitter

jmpoholarz opened a new pull request #3385:
URL: https://github.com/apache/storm/pull/3385


   ## What is the purpose of the change
   
   The Trident Kafka Spout/Emitter does not measure a few metrics that my team has found useful over the years using the spout.  This PR adds these couple metrics back to the open source.
   
   1. **Event emit rate**
   Having a count of how many events are being emitted by the spout can track the rate at which the spout is consuming from the kafka topic(s).  If this rate decreases, it signals performance issues to investigate.
   (@kishorvpatil suggested using a meter here.  The initial thought was to just use a Counter or a Gauge which resets to 0 every time getValue() is called)
   
   2. **Kafka Spout Max Lag**
   This gauge value monitors the backlog of events to determine if the spout is not emitting fast enough to keep up with the incoming data or if the spout is current and Kafka topic delayed.  This metric can be used to distinguish between these two cases and determine the source of the slowdown.
   
   ## How was the change tested
   The unit tests for the emitter verify the counts match the expected values.
   Additionally, my team compiled the storm-kafka-client package and included it as a dependency in our project.  After launching our topologies, we verified that the metric values on our YAMAS charts matched the equivalent metrics we had hardcoded into our custom Kafka emitter.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3385: STORM-3759 Additonal Trident Kafka Spout Metrics

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3385:
URL: https://github.com/apache/storm/pull/3385#discussion_r600709138



##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
+    protected transient Meter eventEmitRate;

Review comment:
       With the recent change on metrics in storm (converting to v2 metrics),  stream name will be appended to the metric name. 
   
   ```
   The tuple counting metric names contain "${stream_name}" or "${upstream_component}:${stream_name}". The former is used for all spout metrics and for outgoing bolt metrics (__emit-count and __transfer-count). The latter is used for bolt metrics that deal with incoming tuples.
   ```
   
   
   [DimensionalReporter](https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java) can be used to separate dimensions (stream_name, componentId, etc) from metrics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] jmpoholarz commented on a change in pull request #3385: Add Metrics for Trident Kafka Spout/Emitter

Posted by GitBox <gi...@apache.org>.
jmpoholarz commented on a change in pull request #3385:
URL: https://github.com/apache/storm/pull/3385#discussion_r597997026



##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
+    protected transient Meter eventEmitRate;

Review comment:
       This builtin might be sufficient.  Our metric names seem to be reporting values like `__emit-count-s1` or `__emit-count-s2` (there are almost 200 of these on our Yamas metric search, I'm guessing one per stream).  Perhaps something is mis-configured in our topologies preventing it from extracting useful stream names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [storm] Ethanlm commented on a change in pull request #3385: Add Metrics for Trident Kafka Spout/Emitter

Posted by GitBox <gi...@apache.org>.
Ethanlm commented on a change in pull request #3385:
URL: https://github.com/apache/storm/pull/3385#discussion_r595447806



##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics

Review comment:
       Can all there variables be `private`? If some is used in unit test, use the modifiers with the smallest scope as possible and add `@VisibleForTesting` annotation to indicate that.

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";

Review comment:
       I would remove "Rate" from the metric name and variable name. 
   
   A meter has metrics like `xx.m1_rate`, `xx.m15_rate`, `.count`. `Rate` in the metric name is redundant and somewhat confusing. 

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
+    protected transient Meter eventEmitRate;

Review comment:
       Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig,
         this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
         this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp();
         LOG.debug("Created {}", this.toString());
+
+        registerMetric();
+    }
+
+    /**
+     * Acquires metric instances through registration with the TopologyContext.
+     */
+    private void registerMetric() {
+        LOG.info("Registering Spout Metrics");
+
+        String configGroupId = "";
+        if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) {
+            configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE;
+        }
+
+        eventEmitRate = topologyContext.registerMeter(
+            configGroupId + EVENT_EMIT_METRIC_NAME);
+        kafkaClientMaxLag = topologyContext.registerGauge(
+            configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME,
+            new Gauge<Double>() {
+                @Override
+                public Double getValue() {
+                    if (consumer == null) {
+                        return 0.0;
+                    }
+                    // Extract spout lag from consumer's internal metrics
+                    for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) {
+                        Metric metric = metricKeyVal.getValue();
+                        if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) {

Review comment:
       If I understand correctly, there are actually two types of "records-lag-max" metrics, 
   
   one is partition level 
   https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L133-L134
   
   and the other one is consumer level
   https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L98-L99
   
   They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from `consumer.metrics()` is.
   
   I noticed that older kafka version has different way of doing things . 
   https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L108
   
   We will to take care of compatibility issues here.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org