You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/23 00:15:12 UTC
[kafka] branch 1.0 updated: MINOR: bugfix streams total metrics
(#5277)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new 9450a24 MINOR: bugfix streams total metrics (#5277)
9450a24 is described below
commit 9450a241e4fbaa29af7791fa714ac701952e01c7
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Fri Jun 22 19:14:34 2018 -0500
MINOR: bugfix streams total metrics (#5277)
Previously, we erroneously summed the invocation metrics instead of counting them.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/StreamThread.java | 37 +++++++++++-----------
.../processor/internals/StreamsMetricsImpl.java | 13 +++++---
2 files changed, 26 insertions(+), 24 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 08dfdd1..f633395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -25,16 +25,15 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
@@ -62,6 +61,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton;
@@ -502,40 +502,39 @@ public class StreamThread extends Thread implements ThreadDataProvider {
commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
- commitTimeSensor.add(createMeter(metrics, new Count(), "commit", "commit calls"));
+ commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ commitTimeSensor.add(metrics.metricName("commit-total", this.groupName, "The total number of commit calls", this.tags), new Count());
+
pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
- pollTimeSensor.add(createMeter(metrics, new Count(), "poll", "record-poll calls"));
+ pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ pollTimeSensor.add(metrics.metricName("poll-total", this.groupName, "The total number of record-poll calls", this.tags), new Count());
processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
- processTimeSensor.add(createMeter(metrics, new Count(), "process", "process calls"));
+ processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ processTimeSensor.add(metrics.metricName("process-total", this.groupName, "The total number of process calls", this.tags), new Count());
punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
- punctuateTimeSensor.add(createMeter(metrics, new Count(), "punctuate", "punctuate calls"));
+ punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ punctuateTimeSensor.add(metrics.metricName("punctuate-total", this.groupName, "The total number of punctuate calls", this.tags), new Count());
taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
- taskCreatedSensor.add(createMeter(metrics, new Count(), "task-created", "newly created tasks"));
+ taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", this.tags), new Total());
tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
- tasksClosedSensor.add(createMeter(metrics, new Count(), "task-closed", "closed tasks"));
+ tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(TimeUnit.SECONDS, new Count()));
+ tasksClosedSensor.add(metrics.metricName("task-closed-total", this.groupName, "The total number of closed tasks", this.tags), new Total());
skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
- skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));
-
- }
-
- private Meter createMeter(Metrics metrics, SampledStat stat, String baseName, String descriptiveName) {
- MetricName rateMetricName = metrics.metricName(baseName + "-rate", groupName,
- String.format("The average per-second number of %s", descriptiveName), tags);
- MetricName totalMetricName = metrics.metricName(baseName + "-total", groupName,
- String.format("The total number of %s", descriptiveName), tags);
- return new Meter(stat, rateMetricName, totalMetricName);
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records", this.tags), new Rate(TimeUnit.SECONDS, new Sum()));
+ skippedRecordsSensor.add(metrics.metricName("skipped-records-total", this.groupName, "The total number of skipped records", this.tags), new Total());
}
void removeAllSensors() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
index 03a4819..cf25dd1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
@@ -24,19 +24,20 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
public class StreamsMetricsImpl implements StreamsMetrics {
private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
@@ -176,7 +177,9 @@ public class StreamsMetricsImpl implements StreamsMetrics {
MetricName totalMetricName = metrics.metricName(opName + "-total", groupNameFromScope(scopeName),
"The total number of occurrence of " + opName + " operations.", tags);
if (!metrics.metrics().containsKey(rateMetricName) && !metrics.metrics().containsKey(totalMetricName)) {
- sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));
+ sensor.add(rateMetricName, new Rate(TimeUnit.SECONDS, new Count()));
+ sensor.add(totalMetricName, new Count());
+
} else {
log.trace("Trying to add metric twice: {} {}", rateMetricName, totalMetricName);
}