You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/06 15:51:37 UTC
[kafka] branch trunk updated: Minor: Refactor methods to add
metrics to sensor in `StreamsMetricsImpl` (#7161)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7663a6c Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)
7663a6c is described below
commit 7663a6c44daae5d72f38cbba79d728416e11167d
Author: cadonna <br...@confluent.io>
AuthorDate: Tue Aug 6 17:51:08 2019 +0200
Minor: Refactor methods to add metrics to sensor in `StreamsMetricsImpl` (#7161)
Renames method names in StreamsMetricsImpl to make them consistent.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../streams/kstream/internals/metrics/Sensors.java | 2 +-
.../streams/processor/internals/ProcessorNode.java | 13 +++---
.../internals/metrics/StreamsMetricsImpl.java | 50 +++++++++++-----------
.../processor/internals/metrics/ThreadMetrics.java | 30 ++++++-------
.../AbstractRocksDBSegmentedBytesStore.java | 4 +-
.../state/internals/InMemorySessionStore.java | 8 ++--
.../state/internals/InMemoryWindowStore.java | 4 +-
.../streams/state/internals/metrics/Sensors.java | 12 +++---
.../internals/metrics/StreamsMetricsImplTest.java | 20 ++++-----
.../internals/metrics/ThreadMetricsTest.java | 26 +++++------
10 files changed, 85 insertions(+), 84 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 038b8ac..363ec6e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -44,7 +44,7 @@ public class Sensors {
LATE_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
sensor,
PROCESSOR_NODE_METRICS_GROUP,
metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, context.currentNode().name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 01e3e56..bc66ede 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -33,8 +33,7 @@ import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
public class ProcessorNode<K, V> {
@@ -232,12 +231,14 @@ public class ProcessorNode<K, V> {
final Map<String, String> taskTags,
final Map<String, String> nodeTags) {
final Sensor parent = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
- addAvgMaxLatency(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
- addInvocationRateAndCount(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addAvgAndMaxLatencyToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ StreamsMetricsImpl
+ .addInvocationRateAndCountToSensor(parent, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
final Sensor sensor = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent);
- addAvgMaxLatency(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
- addInvocationRateAndCount(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addAvgAndMaxLatencyToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ StreamsMetricsImpl
+ .addInvocationRateAndCountToSensor(sensor, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
return sensor;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index ae3d953..5ac2f33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -344,13 +344,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
// first add the global operation metrics if not yet, with the global tags only
final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
- addAvgMaxLatency(parent, group, allTagMap, operationName);
- addInvocationRateAndCount(parent, group, allTagMap, operationName);
+ addAvgAndMaxLatencyToSensor(parent, group, allTagMap, operationName);
+ addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);
// add the operation metrics with additional tags
final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
- addAvgMaxLatency(sensor, group, tagMap, operationName);
- addInvocationRateAndCount(sensor, group, tagMap, operationName);
+ addAvgAndMaxLatencyToSensor(sensor, group, tagMap, operationName);
+ addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);
parentSensors.put(sensor, parent);
@@ -374,11 +374,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
// first add the global operation metrics if not yet, with the global tags only
final Sensor parent = metrics.sensor(externalParentSensorName(operationName), recordingLevel);
- addInvocationRateAndCount(parent, group, allTagMap, operationName);
+ addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);
// add the operation metrics with additional tags
final Sensor sensor = metrics.sensor(externalChildSensorName(operationName, entityName), recordingLevel, parent);
- addInvocationRateAndCount(sensor, group, tagMap, operationName);
+ addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);
parentSensors.put(sensor, parent);
@@ -397,10 +397,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
}
- public static void addAvgAndMax(final Sensor sensor,
- final String group,
- final Map<String, String> tags,
- final String operation) {
+ public static void addAvgAndMaxToSensor(final Sensor sensor,
+ final String group,
+ final Map<String, String> tags,
+ final String operation) {
sensor.add(
new MetricName(
operation + AVG_SUFFIX,
@@ -419,10 +419,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
);
}
- public static void addAvgMaxLatency(final Sensor sensor,
- final String group,
- final Map<String, String> tags,
- final String operation) {
+ public static void addAvgAndMaxLatencyToSensor(final Sensor sensor,
+ final String group,
+ final Map<String, String> tags,
+ final String operation) {
sensor.add(
new MetricName(
operation + "-latency-avg",
@@ -441,12 +441,12 @@ public class StreamsMetricsImpl implements StreamsMetrics {
);
}
- public static void addInvocationRateAndCount(final Sensor sensor,
- final String group,
- final Map<String, String> tags,
- final String operation,
- final String descriptionOfInvocation,
- final String descriptionOfRate) {
+ public static void addInvocationRateAndCountToSensor(final Sensor sensor,
+ final String group,
+ final Map<String, String> tags,
+ final String operation,
+ final String descriptionOfInvocation,
+ final String descriptionOfRate) {
sensor.add(
new MetricName(
operation + TOTAL_SUFFIX,
@@ -467,11 +467,11 @@ public class StreamsMetricsImpl implements StreamsMetrics {
);
}
- public static void addInvocationRateAndCount(final Sensor sensor,
- final String group,
- final Map<String, String> tags,
- final String operation) {
- addInvocationRateAndCount(
+ public static void addInvocationRateAndCountToSensor(final Sensor sensor,
+ final String group,
+ final Map<String, String> tags,
+ final String operation) {
+ addInvocationRateAndCountToSensor(
sensor,
group,
tags,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
index e177667..f8b7836 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java
@@ -26,8 +26,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_ID_TAG;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMax;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
public class ThreadMetrics {
private ThreadMetrics() {}
@@ -74,7 +74,7 @@ public class ThreadMetrics {
public static Sensor createTaskSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor createTaskSensor = streamsMetrics.threadLevelSensor(CREATE_TASK, RecordingLevel.INFO);
- addInvocationRateAndCount(createTaskSensor,
+ addInvocationRateAndCountToSensor(createTaskSensor,
THREAD_LEVEL_GROUP,
streamsMetrics.threadLevelTagMap(),
CREATE_TASK,
@@ -85,7 +85,7 @@ public class ThreadMetrics {
public static Sensor closeTaskSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor closeTaskSensor = streamsMetrics.threadLevelSensor(CLOSE_TASK, RecordingLevel.INFO);
- addInvocationRateAndCount(closeTaskSensor,
+ addInvocationRateAndCountToSensor(closeTaskSensor,
THREAD_LEVEL_GROUP,
streamsMetrics.threadLevelTagMap(),
CLOSE_TASK,
@@ -97,8 +97,8 @@ public class ThreadMetrics {
public static Sensor commitSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor commitSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
- addAvgAndMax(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
- addInvocationRateAndCount(commitSensor,
+ addAvgAndMaxToSensor(commitSensor, THREAD_LEVEL_GROUP, tagMap, COMMIT_LATENCY);
+ addInvocationRateAndCountToSensor(commitSensor,
THREAD_LEVEL_GROUP,
tagMap,
COMMIT,
@@ -110,8 +110,8 @@ public class ThreadMetrics {
public static Sensor pollSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor pollSensor = streamsMetrics.threadLevelSensor(POLL, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
- addAvgAndMax(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
- addInvocationRateAndCount(pollSensor,
+ addAvgAndMaxToSensor(pollSensor, THREAD_LEVEL_GROUP, tagMap, POLL_LATENCY);
+ addInvocationRateAndCountToSensor(pollSensor,
THREAD_LEVEL_GROUP,
tagMap,
POLL,
@@ -123,8 +123,8 @@ public class ThreadMetrics {
public static Sensor processSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor processSensor = streamsMetrics.threadLevelSensor(PROCESS, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
- addAvgAndMax(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
- addInvocationRateAndCount(processSensor,
+ addAvgAndMaxToSensor(processSensor, THREAD_LEVEL_GROUP, tagMap, PROCESS_LATENCY);
+ addInvocationRateAndCountToSensor(processSensor,
THREAD_LEVEL_GROUP,
tagMap,
PROCESS,
@@ -137,8 +137,8 @@ public class ThreadMetrics {
public static Sensor punctuateSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor punctuateSensor = streamsMetrics.threadLevelSensor(PUNCTUATE, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap();
- addAvgAndMax(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
- addInvocationRateAndCount(punctuateSensor,
+ addAvgAndMaxToSensor(punctuateSensor, THREAD_LEVEL_GROUP, tagMap, PUNCTUATE_LATENCY);
+ addInvocationRateAndCountToSensor(punctuateSensor,
THREAD_LEVEL_GROUP,
tagMap,
PUNCTUATE,
@@ -150,7 +150,7 @@ public class ThreadMetrics {
public static Sensor skipRecordSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor skippedRecordsSensor = streamsMetrics.threadLevelSensor(SKIP_RECORD, Sensor.RecordingLevel.INFO);
- addInvocationRateAndCount(skippedRecordsSensor,
+ addInvocationRateAndCountToSensor(skippedRecordsSensor,
THREAD_LEVEL_GROUP,
streamsMetrics.threadLevelTagMap(),
SKIP_RECORD,
@@ -163,11 +163,11 @@ public class ThreadMetrics {
public static Sensor commitOverTasksSensor(final StreamsMetricsImpl streamsMetrics) {
final Sensor commitOverTasksSensor = streamsMetrics.threadLevelSensor(COMMIT, Sensor.RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS);
- addAvgAndMax(commitOverTasksSensor,
+ addAvgAndMaxToSensor(commitOverTasksSensor,
TASK_LEVEL_GROUP,
tagMap,
COMMIT_LATENCY);
- addInvocationRateAndCount(commitOverTasksSensor,
+ addInvocationRateAndCountToSensor(commitOverTasksSensor,
TASK_LEVEL_GROUP,
tagMap,
COMMIT,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index ef18d3c..97dc8d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -41,7 +41,7 @@ import java.util.Map;
import java.util.Set;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
@@ -182,7 +182,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
- addInvocationRateAndCount(
+ addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index ebe9878..6c64b04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
-
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
@@ -43,6 +40,9 @@ import org.apache.kafka.streams.state.SessionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
+
public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);
@@ -82,7 +82,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
- addInvocationRateAndCount(
+ addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 8063410..1a3e26b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -43,7 +43,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
@@ -98,7 +98,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
- addInvocationRateAndCount(
+ addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.tagMap("task-id", taskName, metricScope + "-id", name()),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
index 13a39c6..8ed4d47 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/Sensors.java
@@ -27,8 +27,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
public final class Sensors {
private Sensors() {}
@@ -42,11 +42,11 @@ public final class Sensors {
final Map<String, String> taskTags,
final Map<String, String> storeTags) {
final Sensor taskSensor = metrics.taskLevelSensor(taskName, operation, level);
- addAvgMaxLatency(taskSensor, metricsGroup, taskTags, operation);
- addInvocationRateAndCount(taskSensor, metricsGroup, taskTags, operation);
+ addAvgAndMaxLatencyToSensor(taskSensor, metricsGroup, taskTags, operation);
+ addInvocationRateAndCountToSensor(taskSensor, metricsGroup, taskTags, operation);
final Sensor sensor = metrics.storeLevelSensor(taskName, storeName, operation, level, taskSensor);
- addAvgMaxLatency(sensor, metricsGroup, storeTags, operation);
- addInvocationRateAndCount(sensor, metricsGroup, storeTags, operation);
+ addAvgAndMaxLatencyToSensor(sensor, metricsGroup, storeTags, operation);
+ addInvocationRateAndCountToSensor(sensor, metricsGroup, storeTags, operation);
return sensor;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
index 678d9f3..4fd6f88 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java
@@ -35,8 +35,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.PROCESSOR_NODE_METRICS_GROUP;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgMaxLatency;
-import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxLatencyToSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -131,14 +131,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
final Map<String, String> nodeTags = mkMap(mkEntry("nkey", "value"));
final Sensor parent1 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
- addAvgMaxLatency(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
- addInvocationRateAndCount(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
+ addAvgAndMaxLatencyToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addInvocationRateAndCountToSensor(parent1, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
final int numberOfTaskMetrics = registry.metrics().size();
final Sensor sensor1 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent1);
- addAvgMaxLatency(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
- addInvocationRateAndCount(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
+ addAvgAndMaxLatencyToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addInvocationRateAndCountToSensor(sensor1, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
@@ -147,14 +147,14 @@ public class StreamsMetricsImplTest extends EasyMockSupport {
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
final Sensor parent2 = metrics.taskLevelSensor(taskName, operation, Sensor.RecordingLevel.DEBUG);
- addAvgMaxLatency(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
- addInvocationRateAndCount(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
+ addAvgAndMaxLatencyToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation);
+ addInvocationRateAndCountToSensor(parent2, PROCESSOR_NODE_METRICS_GROUP, taskTags, operation, "", "");
assertThat(registry.metrics().size(), equalTo(numberOfTaskMetrics));
final Sensor sensor2 = metrics.nodeLevelSensor(taskName, processorNodeName, operation, Sensor.RecordingLevel.DEBUG, parent2);
- addAvgMaxLatency(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
- addInvocationRateAndCount(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
+ addAvgAndMaxLatencyToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation);
+ addInvocationRateAndCountToSensor(sensor2, PROCESSOR_NODE_METRICS_GROUP, nodeTags, operation, "", "");
assertThat(registry.metrics().size(), greaterThan(numberOfTaskMetrics));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index 89395d9..739f028 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -59,7 +59,7 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
replayAll();
@@ -81,7 +81,7 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
replayAll();
@@ -104,9 +104,9 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
- StreamsMetricsImpl.addAvgAndMax(
+ StreamsMetricsImpl.addAvgAndMaxToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
replayAll();
@@ -129,9 +129,9 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
- StreamsMetricsImpl.addAvgAndMax(
+ StreamsMetricsImpl.addAvgAndMaxToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
replayAll();
@@ -154,9 +154,9 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
- StreamsMetricsImpl.addAvgAndMax(
+ StreamsMetricsImpl.addAvgAndMaxToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
replayAll();
@@ -179,9 +179,9 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
- StreamsMetricsImpl.addAvgAndMax(
+ StreamsMetricsImpl.addAvgAndMaxToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operationLatency);
replayAll();
@@ -203,7 +203,7 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.INFO)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap()).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, THREAD_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
replayAll();
@@ -226,9 +226,9 @@ public class ThreadMetricsTest {
mockStatic(StreamsMetricsImpl.class);
expect(streamsMetrics.threadLevelSensor(operation, RecordingLevel.DEBUG)).andReturn(dummySensor);
expect(streamsMetrics.threadLevelTagMap(TASK_ID_TAG, ALL_TASKS)).andReturn(dummyTagMap);
- StreamsMetricsImpl.addInvocationRateAndCount(
+ StreamsMetricsImpl.addInvocationRateAndCountToSensor(
dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operation, totalDescription, rateDescription);
- StreamsMetricsImpl.addAvgAndMax(
+ StreamsMetricsImpl.addAvgAndMaxToSensor(
dummySensor, TASK_LEVEL_GROUP, dummyTagMap, operationLatency);
replayAll();