You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/07/28 20:32:55 UTC
[kafka] branch trunk updated: MINOR: Fix static mock usage in ThreadMetricsTest (#12454)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a5d71e1550 MINOR: Fix static mock usage in ThreadMetricsTest (#12454)
a5d71e1550 is described below
commit a5d71e1550c5a2b670347c0c3bafb0b195bf916c
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Thu Jul 28 22:32:46 2022 +0200
MINOR: Fix static mock usage in ThreadMetricsTest (#12454)
Before this PR the calls to the static methods on StreamsMetricsImpl were just calls and not a verification on the mock. This miss happened during the switch from EasyMock to Mockito.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../internals/metrics/ThreadMetricsTest.java | 422 +++++++++++----------
1 file changed, 224 insertions(+), 198 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
index 6ed97ebf7c..3d2aaa20c8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java
@@ -24,12 +24,14 @@ import org.junit.Test;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.Map;
import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
@@ -54,17 +56,20 @@ public class ThreadMetricsTest {
final String ratioDescription = "The fraction of time the thread spent on processing active tasks";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addValueMetricToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- ratioDescription
- );
-
- final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ ratioDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -74,18 +79,21 @@ public class ThreadMetricsTest {
final String maxDescription = "The maximum number of records processed within an iteration";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- avgDescription,
- maxDescription
- );
- final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.processRecordsSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ avgDescription,
+ maxDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -95,18 +103,21 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum process latency";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operationLatency, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operationLatency,
- avgLatencyDescription,
- maxLatencyDescription
- );
- final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.processLatencySensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operationLatency,
+ avgLatencyDescription,
+ maxLatencyDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -117,18 +128,21 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of calls to process";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operationRate, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
-
- final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.processRateSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -137,17 +151,20 @@ public class ThreadMetricsTest {
final String ratioDescription = "The fraction of time the thread spent on polling records from consumer";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addValueMetricToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- ratioDescription
- );
-
- final Sensor sensor = ThreadMetrics.pollRatioSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.pollRatioSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ ratioDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -157,18 +174,21 @@ public class ThreadMetricsTest {
final String maxDescription = "The maximum number of records polled from consumer within an iteration";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- avgDescription,
- maxDescription
- );
-
- final Sensor sensor = ThreadMetrics.pollRecordsSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.pollRecordsSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ avgDescription,
+ maxDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -181,26 +201,31 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum poll latency";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operationLatency,
- avgLatencyDescription,
- maxLatencyDescription
- );
-
- final Sensor sensor = ThreadMetrics.pollSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.pollSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operationLatency,
+ avgLatencyDescription,
+ maxLatencyDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -213,25 +238,31 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum commit latency";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operationLatency,
- avgLatencyDescription,
- maxLatencyDescription);
-
- final Sensor sensor = ThreadMetrics.commitSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.commitSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operationLatency,
+ avgLatencyDescription,
+ maxLatencyDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -240,17 +271,20 @@ public class ThreadMetricsTest {
final String ratioDescription = "The fraction of time the thread spent on committing all tasks";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addValueMetricToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- ratioDescription
- );
- final Sensor sensor = ThreadMetrics.commitRatioSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.commitRatioSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ ratioDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -262,18 +296,21 @@ public class ThreadMetricsTest {
"The average per-second number of calls to commit over all tasks assigned to one stream thread";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
when(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- TASK_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
- final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ TASK_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -286,26 +323,31 @@ public class ThreadMetricsTest {
final String maxLatencyDescription = "The maximum punctuate latency";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
- StreamsMetricsImpl.addAvgAndMaxToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operationLatency,
- avgLatencyDescription,
- maxLatencyDescription
- );
- final Sensor sensor = ThreadMetrics.punctuateSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.punctuateSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addAvgAndMaxToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operationLatency,
+ avgLatencyDescription,
+ maxLatencyDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -314,39 +356,20 @@ public class ThreadMetricsTest {
final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addValueMetricToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- ratioDescription
- );
-
- final Sensor sensor = ThreadMetrics.punctuateRatioSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
- }
-
- @Test
- public void shouldGetSkipRecordSensor() {
- final String operation = "skipped-records";
- final String totalDescription = "The total number of skipped records";
- final String rateDescription = "The average per-second number of skipped records";
- when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO))
- .thenReturn(expectedSensor);
- when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
-
- final Sensor sensor = ThreadMetrics.skipRecordSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.punctuateRatioSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addValueMetricToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ ratioDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -357,19 +380,20 @@ public class ThreadMetricsTest {
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
-
-
- final Sensor sensor = ThreadMetrics.createTaskSensor(THREAD_ID, streamsMetrics);
-
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.createTaskSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test
@@ -379,19 +403,21 @@ public class ThreadMetricsTest {
final String rateDescription = "The average per-second number of closed tasks";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).thenReturn(expectedSensor);
when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap);
- StreamsMetricsImpl.addInvocationRateAndCountToSensor(
- expectedSensor,
- THREAD_LEVEL_GROUP,
- tagMap,
- operation,
- rateDescription,
- totalDescription
- );
-
-
- final Sensor sensor = ThreadMetrics.closeTaskSensor(THREAD_ID, streamsMetrics);
- assertThat(sensor, is(expectedSensor));
+ try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
+ final Sensor sensor = ThreadMetrics.closeTaskSensor(THREAD_ID, streamsMetrics);
+ streamsMetricsStaticMock.verify(
+ () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+ expectedSensor,
+ THREAD_LEVEL_GROUP,
+ tagMap,
+ operation,
+ rateDescription,
+ totalDescription
+ )
+ );
+ assertThat(sensor, is(expectedSensor));
+ }
}
@Test