You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/23 00:15:12 UTC

[kafka] branch 1.0 updated: MINOR: bugfix streams total metrics (#5277)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 9450a24  MINOR: bugfix streams total metrics (#5277)
9450a24 is described below

commit 9450a241e4fbaa29af7791fa714ac701952e01c7
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 22 19:14:34 2018 -0500

    MINOR: bugfix streams total metrics (#5277)
    
    Previously, we erroneously summed the invocation metrics instead of counting them.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../streams/processor/internals/StreamThread.java  | 37 +++++++++++-----------
 .../processor/internals/StreamsMetricsImpl.java    | 13 +++++---
 2 files changed, 26 insertions(+), 24 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 08dfdd1..f633395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -25,16 +25,15 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
@@ -62,6 +61,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
@@ -502,40 +502,39 @@ public class StreamThread extends Thread implements ThreadDataProvider {
             commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
             commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
             commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
-            commitTimeSensor.add(createMeter(metrics, new Count(), "commit", "commit calls"));
+            commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            commitTimeSensor.add(metrics.metricName("commit-total", this.groupName, "The total number of commit calls", this.tags), new Count());
+
 
             pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
             pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
             pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
-            pollTimeSensor.add(createMeter(metrics, new Count(), "poll", "record-poll calls"));
+            pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            pollTimeSensor.add(metrics.metricName("poll-total", this.groupName, "The total number of record-poll calls", this.tags), new Count());
 
             processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
             processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
             processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
-            processTimeSensor.add(createMeter(metrics, new Count(), "process", "process calls"));
+            processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            processTimeSensor.add(metrics.metricName("process-total", this.groupName, "The total number of process calls", this.tags), new Count());
 
             punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
             punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
             punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
-            punctuateTimeSensor.add(createMeter(metrics, new Count(), "punctuate", "punctuate calls"));
+            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            punctuateTimeSensor.add(metrics.metricName("punctuate-total", this.groupName, "The total number of punctuate calls", this.tags), new Count());
 
             taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
-            taskCreatedSensor.add(createMeter(metrics, new Count(), "task-created", "newly created tasks"));
+            taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", this.tags), new Total());
 
             tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(createMeter(metrics, new Count(), "task-closed", "closed tasks"));
+            tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+            tasksClosedSensor.add(metrics.metricName("task-closed-total", this.groupName, "The total number of closed tasks", this.tags), new Total());
 
             skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
-            skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));
-
-        }
-
-        private Meter createMeter(Metrics metrics, SampledStat stat, String baseName, String descriptiveName) {
-            MetricName rateMetricName = metrics.metricName(baseName + "-rate", groupName,
-                    String.format("The average per-second number of %s", descriptiveName), tags);
-            MetricName totalMetricName = metrics.metricName(baseName + "-total", groupName,
-                    String.format("The total number of %s", descriptiveName), tags);
-            return new Meter(stat, rateMetricName, totalMetricName);
+            skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records", this.tags), new Rate(TimeUnit.SECONDS, new Sum()));
+            skippedRecordsSensor.add(metrics.metricName("skipped-records-total", this.groupName, "The total number of skipped records", this.tags), new Total());
         }
 
         void removeAllSensors() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 03a4819..cf25dd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -24,19 +24,20 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 public class StreamsMetricsImpl implements StreamsMetrics {
     private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
@@ -176,7 +177,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         MetricName totalMetricName = metrics.metricName(opName + "-total", groupNameFromScope(scopeName),
                 "The total number of occurrence of " + opName + " operations.", tags);
         if (!metrics.metrics().containsKey(rateMetricName) && !metrics.metrics().containsKey(totalMetricName)) {
-            sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));
+            sensor.add(rateMetricName, new Rate(TimeUnit.SECONDS, new Count()));
+            sensor.add(totalMetricName, new Count());
+
         } else {
             log.trace("Trying to add metric twice: {} {}", rateMetricName, totalMetricName);
         }