You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/03/16 19:43:20 UTC

[GitHub] [storm] Ethanlm commented on a change in pull request #3385: Add Metrics for Trident Kafka Spout/Emitter

Ethanlm commented on a change in pull request #3385:
URL: https://github.com/apache/storm/pull/3385#discussion_r595447806



##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics

Review comment:
       Can all there variables be `private`? If some is used in unit test, use the modifiers with the smallest scope as possible and add `@VisibleForTesting` annotation to indicate that.

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";

Review comment:
       I would remove "Rate" from the metric name and variable name. 
   
   A meter has metrics like `xx.m1_rate`, `xx.m15_rate`, `.count`. `Rate` in the metric name is redundant and somewhat confusing. 

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
     private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
+    // Metrics
+    public static final String UNDERSCORE = "_";
+    public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+    public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+    protected transient Gauge<Double> kafkaClientMaxLag;
+    public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
+    protected transient Meter eventEmitRate;

Review comment:
       Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count

##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig,
         this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
         this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp();
         LOG.debug("Created {}", this.toString());
+
+        registerMetric();
+    }
+
+    /**
+     * Acquires metric instances through registration with the TopologyContext.
+     */
+    private void registerMetric() {
+        LOG.info("Registering Spout Metrics");
+
+        String configGroupId = "";
+        if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) {
+            configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE;
+        }
+
+        eventEmitRate = topologyContext.registerMeter(
+            configGroupId + EVENT_EMIT_METRIC_NAME);
+        kafkaClientMaxLag = topologyContext.registerGauge(
+            configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME,
+            new Gauge<Double>() {
+                @Override
+                public Double getValue() {
+                    if (consumer == null) {
+                        return 0.0;
+                    }
+                    // Extract spout lag from consumer's internal metrics
+                    for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) {
+                        Metric metric = metricKeyVal.getValue();
+                        if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) {

Review comment:
       If I understand correctly, there are actually two types of "records-lag-max" metrics, 
   
   one is partition level 
   https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L133-L134
   
   and the other one is consumer level
   https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L98-L99
   
   They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from `consumer.metrics()` is.
   
   I noticed that older kafka version has different way of doing things . 
   https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L108
   
   We will to take care of compatibility issues here.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org