You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/10 07:15:40 UTC

kafka git commit: KAFKA-2733: Standardize metric name for Kafka Streams

Repository: kafka
Updated Branches:
  refs/heads/trunk 6e5bd2497 -> ec466d358


KAFKA-2733: Standardize metric name for Kafka Streams

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda, Jun Rao

Closes #643 from guozhangwang/K2733


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec466d35
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec466d35
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec466d35

Branch: refs/heads/trunk
Commit: ec466d358dcd5ff9e035a6d5de82ada12d187ae0
Parents: 6e5bd24
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Dec 9 22:15:34 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Dec 9 22:15:34 2015 -0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 16 +++++++-------
 .../streams/state/MeteredKeyValueStore.java     | 22 ++++++++++----------
 2 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec466d35/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 38333a2..37d24bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -721,23 +721,25 @@ public class StreamThread extends Thread {
             for (int i = 0; i < tags.length; i += 2)
                 tagMap.put(tags[i], tags[i + 1]);
 
+            String metricGroupName = "streaming-" + scopeName + "-metrics";
+
             // first add the global operation metrics if not yet, with the global tags only
-            Sensor parent = metrics.sensor(operationName);
-            addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
+            Sensor parent = metrics.sensor(scopeName + "-" + operationName);
+            addLatencyMetrics(metricGroupName, parent, "all", operationName, this.metricTags);
 
             // add the store operation metrics with additional tags
-            Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
-            addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
+            Sensor sensor = metrics.sensor(scopeName + "-" + entityName + "-" + operationName, parent);
+            addLatencyMetrics(metricGroupName, sensor, entityName, operationName, tagMap);
 
             return sensor;
         }
 
         private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
-            maybeAddMetric(sensor, metrics.metricName(opName + "-avg-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-avg-latency-ms", metricGrpName,
                 "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
-            maybeAddMetric(sensor, metrics.metricName(opName + "-max-latency-ms", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-max-latency-ms", metricGrpName,
                 "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
-            maybeAddMetric(sensor, metrics.metricName(opName + "-qps", metricGrpName,
+            maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-qps", metricGrpName,
                 "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec466d35/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index d75e7e6..16f57a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -31,7 +31,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
     protected final Serdes<K, V> serialization;
-    protected final String metricGrp;
+    protected final String metricScope;
     protected final Time time;
 
     private Sensor putTime;
@@ -48,10 +48,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private KeyValueStoreChangeLogger<K, V> changeLogger = null;
 
     // always wrap the store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
         this.inner = inner;
         this.serialization = serialization;
-        this.metricGrp = metricGrp;
+        this.metricScope = metricScope;
         this.time = time != null ? time : new SystemTime();
     }
 
@@ -69,14 +69,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     public void init(ProcessorContext context) {
         final String name = name();
         this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
-        this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
-        this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
-        this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
-        this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
-        this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
-        this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
-        this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+        this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
+        this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
+        this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
+        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
 
         serialization.init(context);
         this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;