You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by GitBox <gi...@apache.org> on 2021/03/16 19:43:20 UTC
[GitHub] [storm] Ethanlm commented on a change in pull request #3385: Add Metrics for Trident Kafka Spout/Emitter
Ethanlm commented on a change in pull request #3385:
URL: https://github.com/apache/storm/pull/3385#discussion_r595447806
##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+ // Metrics
Review comment:
Can all there variables be `private`? If some is used in unit test, use the modifiers with the smallest scope as possible and add `@VisibleForTesting` annotation to indicate that.
##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+ // Metrics
+ public static final String UNDERSCORE = "_";
+ public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+ public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+ protected transient Gauge<Double> kafkaClientMaxLag;
+ public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
Review comment:
I would remove "Rate" from the metric name and variable name.
A meter has metrics like `xx.m1_rate`, `xx.m15_rate`, `.count`. `Rate` in the metric name is redundant and somewhat confusing.
##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -60,6 +65,14 @@
private static final long serialVersionUID = -7343927794834130435L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+ // Metrics
+ public static final String UNDERSCORE = "_";
+ public static final String INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC = "records-lag-max";
+ public static final String KAFKA_CLIENT_MAX_LAG_METRIC_NAME = "kafkaClientMaxLag";
+ protected transient Gauge<Double> kafkaClientMaxLag;
+ public static final String EVENT_EMIT_METRIC_NAME = "eventEmitRate";
+ protected transient Meter eventEmitRate;
Review comment:
Is the builtin emitted metric not sufficient for this purpose? https://github.com/apache/storm/blob/master/docs/Metrics.md#__emit-count
##########
File path: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
##########
@@ -97,13 +110,48 @@ public KafkaTridentSpoutEmitter(KafkaTridentSpoutConfig<K, V> kafkaSpoutConfig,
this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp();
LOG.debug("Created {}", this.toString());
+
+ registerMetric();
+ }
+
+ /**
+ * Acquires metric instances through registration with the TopologyContext.
+ */
+ private void registerMetric() {
+ LOG.info("Registering Spout Metrics");
+
+ String configGroupId = "";
+ if (kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG) != null) {
+ configGroupId = kafkaSpoutConfig.getKafkaProps().get(ConsumerConfig.GROUP_ID_CONFIG).toString() + UNDERSCORE;
+ }
+
+ eventEmitRate = topologyContext.registerMeter(
+ configGroupId + EVENT_EMIT_METRIC_NAME);
+ kafkaClientMaxLag = topologyContext.registerGauge(
+ configGroupId + KAFKA_CLIENT_MAX_LAG_METRIC_NAME,
+ new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ if (consumer == null) {
+ return 0.0;
+ }
+ // Extract spout lag from consumer's internal metrics
+ for (Map.Entry<MetricName, ? extends Metric> metricKeyVal : consumer.metrics().entrySet()) {
+ Metric metric = metricKeyVal.getValue();
+ if (metric.metricName().name().equals(INTERNAL_KAFKA_RECORDS_LAG_MAX_METRIC)) {
Review comment:
If I understand correctly, there are actually two types of "records-lag-max" metrics,
one is partition level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L133-L134
and the other one is consumer level
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L98-L99
They use the same name as "records-lag-max", while the tags are different. In this case, it is hard to tell what the first "records-lag-max" metric from `consumer.metrics()` is.
I noticed that older kafka version has different way of doing things .
https://github.com/apache/kafka/blob/0.11.0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java#L108
We will to take care of compatibility issues here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org