You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/06 15:51:37 UTC

[kafka] branch trunk updated: Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7663a6c  Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)
7663a6c is described below

commit 7663a6c44daae5d72f38cbba79d728416e11167d
Author: cadonna <br...@confluent.io>
AuthorDate: Tue Aug 6 17:51:08 2019 +0200

    Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)
    
    Renames method names in StreamsMetricsImpl to make them consistent.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/kstream/internals/metrics/Sensors.java |  2 +-
 .../streams/processor/internals/ProcessorNode.java | 13 +++---
 .../internals/metrics/StreamsMetricsImpl.java      | 50 +++++++++++-----------
 .../processor/internals/metrics/ThreadMetrics.java | 30 ++++++-------
 .../AbstractRocksDBSegmentedBytesStore.java        |  4 +-
 .../state/internals/InMemorySessionStore.java      |  8 ++--
 .../state/internals/InMemoryWindowStore.java       |  4 +-
 .../streams/state/internals/metrics/Sensors.java   | 12 +++---
 .../internals/metrics/StreamsMetricsImplTest.java  | 20 ++++-----
 .../internals/metrics/ThreadMetricsTest.java       | 26 +++++------
 10 files changed, 85 insertions(+), 84 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 038b8ac..363ec6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -44,7 +44,7 @@ public class Sensors {
             LATE_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             sensor,
             PROCESSOR_NODE_METRICS_GROUP,
             metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 01e3e56..bc66ede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -33,8 +33,7 @@ import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
 
 public class ProcessorNode<K, V> {
 
@@ -232,12 +231,14 @@ public class ProcessorNode<K, V> {
                                                                            final Map<String, String> taskTags,
                                                                            final Map<String, String> nodeTags) {
             final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
-            addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
-            addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+            addAvgAndMaxLatencyToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+            StreamsMetricsImpl
+                .addInvocationRateAndCountToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
 
             final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
-            addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
-            addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+            addAvgAndMaxLatencyToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+            StreamsMetricsImpl
+                .addInvocationRateAndCountToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
 
             return sensor;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index ae3d953..5ac2f33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -344,13 +344,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
         // first add the global operation metrics if not yet, with the global tags only
         final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
-        addAvgMaxLatency(parent, group, allTagMap, operationName);
-        addInvocationRateAndCount(parent, group, allTagMap, operationName);
+        addAvgAndMaxLatencyToSensor(parent, group, allTagMap, operationName);
+        addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);
 
         // add the operation metrics with additional tags
         final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
-        addAvgMaxLatency(sensor, group, tagMap, operationName);
-        addInvocationRateAndCount(sensor, group, tagMap, operationName);
+        addAvgAndMaxLatencyToSensor(sensor, group, tagMap, operationName);
+        addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);
 
         parentSensors.put(sensor, parent);
 
@@ -374,11 +374,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
         // first add the global operation metrics if not yet, with the global tags only
         final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
-        addInvocationRateAndCount(parent, group, allTagMap, operationName);
+        addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);
 
         // add the operation metrics with additional tags
         final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
-        addInvocationRateAndCount(sensor, group, tagMap, operationName);
+        addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);
 
         parentSensors.put(sensor, parent);
 
@@ -397,10 +397,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
     }
 
 
-    public static void addAvgAndMax(final Sensor sensor,
-                                    final String group,
-                                    final Map<String, String> tags,
-                                    final String operation) {
+    public static void addAvgAndMaxToSensor(final Sensor sensor,
+                                            final String group,
+                                            final Map<String, String> tags,
+                                            final String operation) {
         sensor.add(
             new MetricName(
                 operation + AVG_SUFFIX,
@@ -419,10 +419,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         );
     }
 
-    public static void addAvgMaxLatency(final Sensor sensor,
-                                        final String group,
-                                        final Map<String, String> tags,
-                                        final String operation) {
+    public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
+                                                   final String group,
+                                                   final Map<String, String> tags,
+                                                   final String operation) {
         sensor.add(
             new MetricName(
                 operation + "-latency-avg",
@@ -441,12 +441,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         );
     }
 
-    public static void addInvocationRateAndCount(final Sensor sensor,
-                                                 final String group,
-                                                 final Map<String, String> tags,
-                                                 final String operation,
-                                                 final String descriptionOfInvocation,
-                                                 final String descriptionOfRate) {
+    public static void addInvocationRateAndCountToSensor(final Sensor sensor,
+                                                         final String group,
+                                                         final Map<String, String> tags,
+                                                         final String operation,
+                                                         final String descriptionOfInvocation,
+                                                         final String descriptionOfRate) {
         sensor.add(
             new MetricName(
                 operation + TOTAL_SUFFIX,
@@ -467,11 +467,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         );
     }
 
-    public static void addInvocationRateAndCount(final Sensor sensor,
-                                                 final String group,
-                                                 final Map<String, String> tags,
-                                                 final String operation) {
-        addInvocationRateAndCount(
+    public static void addInvocationRateAndCountToSensor(final Sensor sensor,
+                                                         final String group,
+                                                         final Map<String, String> tags,
+                                                         final String operation) {
+        addInvocationRateAndCountToSensor(
             sensor,
             group,
             tags,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index e177667..f8b7836 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -26,8 +26,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 
 public class ThreadMetrics {
     private ThreadMetrics() {}
@@ -74,7 +74,7 @@ public class ThreadMetrics {
 
     public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO);
-        addInvocationRateAndCount(createTaskSensor,
+        addInvocationRateAndCountToSensor(createTaskSensor,
                                   THREAD_LEVEL_GROUP,
                                   streamsMetrics.threadLevelTagMap(),
                                   CREATE_TASK,
@@ -85,7 +85,7 @@ public class ThreadMetrics {
 
     public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO);
-        addInvocationRateAndCount(closeTaskSensor,
+        addInvocationRateAndCountToSensor(closeTaskSensor,
                                   THREAD_LEVEL_GROUP,
                                   streamsMetrics.threadLevelTagMap(),
                                   CLOSE_TASK,
@@ -97,8 +97,8 @@ public class ThreadMetrics {
     public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO);
         final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
-        addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
-        addInvocationRateAndCount(commitSensor,
+        addAvgAndMaxToSensor(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
+        addInvocationRateAndCountToSensor(commitSensor,
                                   THREAD_LEVEL_GROUP,
                                   tagMap,
                                   COMMIT,
@@ -110,8 +110,8 @@ public class ThreadMetrics {
     public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO);
         final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
-        addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
-        addInvocationRateAndCount(pollSensor,
+        addAvgAndMaxToSensor(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
+        addInvocationRateAndCountToSensor(pollSensor,
                                   THREAD_LEVEL_GROUP,
                                   tagMap,
                                   POLL,
@@ -123,8 +123,8 @@ public class ThreadMetrics {
     public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO);
         final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
-        addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
-        addInvocationRateAndCount(processSensor,
+        addAvgAndMaxToSensor(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
+        addInvocationRateAndCountToSensor(processSensor,
                                   THREAD_LEVEL_GROUP,
                                   tagMap,
                                   PROCESS,
@@ -137,8 +137,8 @@ public class ThreadMetrics {
     public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO);
         final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
-        addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
-        addInvocationRateAndCount(punctuateSensor,
+        addAvgAndMaxToSensor(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
+        addInvocationRateAndCountToSensor(punctuateSensor,
                                   THREAD_LEVEL_GROUP,
                                   tagMap,
                                   PUNCTUATE,
@@ -150,7 +150,7 @@ public class ThreadMetrics {
 
     public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO);
-        addInvocationRateAndCount(skippedRecordsSensor,
+        addInvocationRateAndCountToSensor(skippedRecordsSensor,
                                   THREAD_LEVEL_GROUP,
                                   streamsMetrics.threadLevelTagMap(),
                                   SKIP_RECORD,
@@ -163,11 +163,11 @@ public class ThreadMetrics {
     public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) {
         final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG);
         final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS);
-        addAvgAndMax(commitOverTasksSensor,
+        addAvgAndMaxToSensor(commitOverTasksSensor,
                      TASK_LEVEL_GROUP,
                      tagMap,
                      COMMIT_LATENCY);
-        addInvocationRateAndCount(commitOverTasksSensor,
+        addInvocationRateAndCountToSensor(commitOverTasksSensor,
                                   TASK_LEVEL_GROUP,
                                   tagMap,
                                   COMMIT,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index ef18d3c..97dc8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -41,7 +41,7 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 
 public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
@@ -182,7 +182,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
             EXPIRED_WINDOW_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
-        addInvocationRateAndCount(
+        addInvocationRateAndCountToSensor(
             expiredRecordSensor,
             "stream-" + metricScope + "-metrics",
             metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index ebe9878..6c64b04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
-
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -43,6 +40,9 @@ import org.apache.kafka.streams.state.SessionStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
+
 public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);
@@ -82,7 +82,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
             EXPIRED_WINDOW_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
-        addInvocationRateAndCount(
+        addInvocationRateAndCountToSensor(
             expiredRecordSensor,
             "stream-" + metricScope + "-metrics",
             metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 8063410..1a3e26b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -43,7 +43,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
 
@@ -98,7 +98,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             EXPIRED_WINDOW_RECORD_DROP,
             Sensor.RecordingLevel.INFO
         );
-        addInvocationRateAndCount(
+        addInvocationRateAndCountToSensor(
             expiredRecordSensor,
             "stream-" + metricScope + "-metrics",
             metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
index 13a39c6..8ed4d47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
 
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 
 public final class Sensors {
     private Sensors() {}
@@ -42,11 +42,11 @@ public final class Sensors {
                                                                        final Map<String, String> taskTags,
                                                                        final Map<String, String> storeTags) {
         final Sensor taskSensor = metrics.taskLevelSensor(taskName, operation, level);
-        addAvgMaxLatency(taskSensor, metricsGroup, taskTags, operation);
-        addInvocationRateAndCount(taskSensor, metricsGroup, taskTags, operation);
+        addAvgAndMaxLatencyToSensor(taskSensor, metricsGroup, taskTags, operation);
+        addInvocationRateAndCountToSensor(taskSensor, metricsGroup, taskTags, operation);
         final Sensor sensor = metrics.storeLevelSensor(taskName, storeName, operation, level, taskSensor);
-        addAvgMaxLatency(sensor, metricsGroup, storeTags, operation);
-        addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
+        addAvgAndMaxLatencyToSensor(sensor, metricsGroup, storeTags, operation);
+        addInvocationRateAndCountToSensor(sensor, metricsGroup, storeTags, operation);
         return sensor;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 678d9f3..4fd6f88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -131,14 +131,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
         final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
 
         final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
-        addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
-        addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
+        addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
 
         final int numberOfTaskMetrics = registry.metrics().size();
 
         final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
-        addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
-        addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
+        addAvgAndMaxLatencyToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCountToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
 
         assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
 
@@ -147,14 +147,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
         assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
 
         final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
-        addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
-        addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
+        addAvgAndMaxLatencyToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+        addInvocationRateAndCountToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
 
         assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
 
         final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
-        addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
-        addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
+        addAvgAndMaxLatencyToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+        addInvocationRateAndCountToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
 
         assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index 89395d9..739f028 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -59,7 +59,7 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
 
         replayAll();
@@ -81,7 +81,7 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
 
         replayAll();
@@ -104,9 +104,9 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
-        StreamsMetricsImpl.addAvgAndMax(
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
 
         replayAll();
@@ -129,9 +129,9 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
-        StreamsMetricsImpl.addAvgAndMax(
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
 
         replayAll();
@@ -154,9 +154,9 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
-        StreamsMetricsImpl.addAvgAndMax(
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
 
         replayAll();
@@ -179,9 +179,9 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
-        StreamsMetricsImpl.addAvgAndMax(
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
 
         replayAll();
@@ -203,7 +203,7 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
 
         replayAll();
@@ -226,9 +226,9 @@ public class ThreadMetricsTest {
         mockStatic(StreamsMetricsImpl.class);
         expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor);
         expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap);
-        StreamsMetricsImpl.addInvocationRateAndCount(
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
             dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
-        StreamsMetricsImpl.addAvgAndMax(
+        StreamsMetricsImpl.addAvgAndMaxToSensor(
             dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency);
 
         replayAll();