You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/13 21:57:48 UTC
[kafka] branch trunk updated: MINOR: Refactor
`MetricsIntegrationTest` (#6930)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 48b65ac MINOR: Refactor `MetricsIntegrationTest` (#6930)
48b65ac is described below
commit 48b65ac617a17cb5570e152256c1073619cfaabe
Author: cadonna <br...@confluent.io>
AuthorDate: Thu Jun 13 23:57:26 2019 +0200
MINOR: Refactor `MetricsIntegrationTest` (#6930)
Reviewers: Sophie Blee-Goldman <so...@confluent.io>, Bill Bejeck <bb...@gmail.com>
---
.../integration/MetricsIntegrationTest.java | 433 ++++++++++-----------
1 file changed, 216 insertions(+), 217 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index a3c5b87..0f727d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -27,8 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
@@ -61,8 +59,7 @@ public class MetricsIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
// Metric group
private static final String STREAM_THREAD_NODE_METRICS = "stream-metrics";
@@ -156,10 +153,6 @@ public class MetricsIntegrationTest {
private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
- private StreamsBuilder builder;
- private Properties streamsConfiguration;
- private KafkaStreams kafkaStreams;
-
// topic names
private static final String STREAM_INPUT = "STREAM_INPUT";
private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
@@ -167,17 +160,16 @@ public class MetricsIntegrationTest {
private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
- private KStream<Integer, String> stream;
- private KStream<Integer, String> stream2;
-
- private final String appId = "stream-metrics-test";
+ private StreamsBuilder builder;
+ private Properties streamsConfiguration;
+ private KafkaStreams kafkaStreams;
@Before
public void before() throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
streamsConfiguration = new Properties();
- streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-metrics-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -212,26 +204,22 @@ public class MetricsIntegrationTest {
() -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms");
}
- private void checkMetricDeregistration() {
- final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
- assertThat(listMetricAfterClosingApp.size(), is(0));
- }
-
@Test
- public void testStreamMetric() throws Exception {
- stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
- stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
- builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
- .toStream()
- .to(STREAM_OUTPUT_2);
- builder.table(STREAM_OUTPUT_2, Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled())
- .toStream()
- .to(STREAM_OUTPUT_3);
- builder.table(STREAM_OUTPUT_3, Materialized.as(Stores.lruMap(MY_STORE_LRU_MAP, 10000)).withCachingEnabled())
- .toStream()
- .to(STREAM_OUTPUT_4);
-
+ public void shouldAddMetricsOnAllLevels() throws Exception {
+ builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+ .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
+ builder.table(STREAM_OUTPUT_1,
+ Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
+ .toStream()
+ .to(STREAM_OUTPUT_2);
+ builder.table(STREAM_OUTPUT_2,
+ Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled())
+ .toStream()
+ .to(STREAM_OUTPUT_3);
+ builder.table(STREAM_OUTPUT_3,
+ Materialized.as(Stores.lruMap(MY_STORE_LRU_MAP, 10000)).withCachingEnabled())
+ .toStream()
+ .to(STREAM_OUTPUT_4);
startApplication();
checkThreadLevelMetrics();
@@ -244,241 +232,252 @@ public class MetricsIntegrationTest {
closeApplication();
- // check all metrics de-registered
- checkMetricDeregistration();
+ checkMetricsDeregistration();
}
@Test
- public void testStreamMetricOfWindowStore() throws Exception {
- stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
- final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
- groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50)))
- .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue,
- Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
- .withValueSerde(Serdes.Long()));
-
+ public void shouldAddMetricsForWindowStore() throws Exception {
+ builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+ .groupByKey()
+ .windowedBy(TimeWindows.of(Duration.ofMillis(50)))
+ .aggregate(() -> 0L,
+ (aggKey, newValue, aggValue) -> aggValue,
+ Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
+ .withValueSerde(Serdes.Long()));
startApplication();
checkWindowStoreMetrics();
closeApplication();
- // check all metrics de-registered
- checkMetricDeregistration();
+ checkMetricsDeregistration();
}
@Test
- public void testStreamMetricOfSessionStore() throws Exception {
- stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
- final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
- groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50)))
- .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue,
- Materialized.<Integer, Long, SessionStore<Bytes, byte[]>>as(SESSION_AGGREGATED_STREAM_STORE)
- .withValueSerde(Serdes.Long()));
-
+ public void shouldAddMetricsForSessionStore() throws Exception {
+ builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+ .groupByKey()
+ .windowedBy(SessionWindows.with(Duration.ofMillis(50)))
+ .aggregate(() -> 0L,
+ (aggKey, newValue, aggValue) -> aggValue,
+ (aggKey, leftAggValue, rightAggValue) -> leftAggValue,
+ Materialized.<Integer, Long, SessionStore<Bytes, byte[]>>as(SESSION_AGGREGATED_STREAM_STORE)
+ .withValueSerde(Serdes.Long()));
startApplication();
checkSessionStoreMetrics();
closeApplication();
- // check all metrics de-registered
- checkMetricDeregistration();
- }
-
- private void checkTaskLevelMetrics() {
- final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
- testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
- testMetricByName(listMetricTask, COMMIT_RATE, 5);
- testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
- testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
- testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
+ checkMetricsDeregistration();
}
private void checkThreadLevelMetrics() {
final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, COMMIT_RATE, 1);
- testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
- testMetricByName(listMetricThread, POLL_RATE, 1);
- testMetricByName(listMetricThread, POLL_TOTAL, 1);
- testMetricByName(listMetricThread, PROCESS_RATE, 1);
- testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
- testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
- testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
- testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
- testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
- testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
- testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
- testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
- testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
+ .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS))
+ .collect(Collectors.toList());
+ checkMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
+ checkMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
+ checkMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
+ checkMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
+ checkMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
+ checkMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
+ checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
+ checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
+ checkMetricByName(listMetricThread, COMMIT_RATE, 1);
+ checkMetricByName(listMetricThread, COMMIT_TOTAL, 1);
+ checkMetricByName(listMetricThread, POLL_RATE, 1);
+ checkMetricByName(listMetricThread, POLL_TOTAL, 1);
+ checkMetricByName(listMetricThread, PROCESS_RATE, 1);
+ checkMetricByName(listMetricThread, PROCESS_TOTAL, 1);
+ checkMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
+ checkMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
+ checkMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
+ checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
+ checkMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
+ checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
+ checkMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
+ checkMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
+ }
+
+ private void checkTaskLevelMetrics() {
+ final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS))
+ .collect(Collectors.toList());
+ checkMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
+ checkMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
+ checkMetricByName(listMetricTask, COMMIT_RATE, 5);
+ checkMetricByName(listMetricTask, COMMIT_TOTAL, 5);
+ checkMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
+ checkMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
}
private void checkProcessorLevelMetrics() {
final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
- testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
- testMetricByName(listMetricProcessor, CREATE_RATE, 18);
- testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
- testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
- testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
- testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
+ .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS))
+ .collect(Collectors.toList());
+ checkMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
+ checkMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
+ checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
+ checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
+ checkMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
+ checkMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
+ checkMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
+ checkMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
+ checkMetricByName(listMetricProcessor, PROCESS_RATE, 18);
+ checkMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
+ checkMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
+ checkMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
+ checkMetricByName(listMetricProcessor, CREATE_RATE, 18);
+ checkMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
+ checkMetricByName(listMetricProcessor, DESTROY_RATE, 18);
+ checkMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
+ checkMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
}
private void checkKeyValueStoreMetricsByType(final String storeType) {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(storeType))
.collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
- testMetricByName(listMetricStore, GET_RATE, 2);
- testMetricByName(listMetricStore, DELETE_RATE, 2);
- testMetricByName(listMetricStore, DELETE_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
- testMetricByName(listMetricStore, ALL_RATE, 2);
- testMetricByName(listMetricStore, ALL_TOTAL, 2);
- testMetricByName(listMetricStore, RANGE_RATE, 2);
- testMetricByName(listMetricStore, RANGE_TOTAL, 2);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_RATE, 2);
+ checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
+ checkMetricByName(listMetricStore, GET_RATE, 2);
+ checkMetricByName(listMetricStore, DELETE_RATE, 2);
+ checkMetricByName(listMetricStore, DELETE_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_ALL_RATE, 2);
+ checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
+ checkMetricByName(listMetricStore, ALL_RATE, 2);
+ checkMetricByName(listMetricStore, ALL_TOTAL, 2);
+ checkMetricByName(listMetricStore, RANGE_RATE, 2);
+ checkMetricByName(listMetricStore, RANGE_TOTAL, 2);
+ checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+ checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+ checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+ }
+
+ private void checkMetricsDeregistration() {
+ final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().contains(STREAM_STRING))
+ .collect(Collectors.toList());
+ assertThat(listMetricAfterClosingApp.size(), is(0));
}
private void checkCacheMetrics() {
final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
- testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
- testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
+ .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS))
+ .collect(Collectors.toList());
+ checkMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
+ checkMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
+ checkMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
}
private void checkWindowStoreMetrics() {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
.collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
- testMetricByName(listMetricStore, GET_RATE, 0);
- testMetricByName(listMetricStore, DELETE_RATE, 0);
- testMetricByName(listMetricStore, DELETE_TOTAL, 0);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
- testMetricByName(listMetricStore, ALL_RATE, 0);
- testMetricByName(listMetricStore, ALL_TOTAL, 0);
- testMetricByName(listMetricStore, RANGE_RATE, 0);
- testMetricByName(listMetricStore, RANGE_TOTAL, 0);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_RATE, 2);
+ checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+ checkMetricByName(listMetricStore, GET_RATE, 0);
+ checkMetricByName(listMetricStore, DELETE_RATE, 0);
+ checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+ checkMetricByName(listMetricStore, ALL_RATE, 0);
+ checkMetricByName(listMetricStore, ALL_TOTAL, 0);
+ checkMetricByName(listMetricStore, RANGE_RATE, 0);
+ checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
+ checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+ checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+ checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
private void checkSessionStoreMetrics() {
final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
.filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
.collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
- testMetricByName(listMetricStore, GET_RATE, 0);
- testMetricByName(listMetricStore, DELETE_RATE, 0);
- testMetricByName(listMetricStore, DELETE_TOTAL, 0);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
- testMetricByName(listMetricStore, ALL_RATE, 0);
- testMetricByName(listMetricStore, ALL_TOTAL, 0);
- testMetricByName(listMetricStore, RANGE_RATE, 0);
- testMetricByName(listMetricStore, RANGE_TOTAL, 0);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+ checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ checkMetricByName(listMetricStore, PUT_RATE, 2);
+ checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+ checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+ checkMetricByName(listMetricStore, GET_RATE, 0);
+ checkMetricByName(listMetricStore, DELETE_RATE, 0);
+ checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+ checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+ checkMetricByName(listMetricStore, ALL_RATE, 0);
+ checkMetricByName(listMetricStore, ALL_TOTAL, 0);
+ checkMetricByName(listMetricStore, RANGE_RATE, 0);
+ checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
+ checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+ checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+ checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
- private void testMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
- final List<Metric> metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
+ private void checkMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
+ final List<Metric> metrics = listMetric.stream()
+ .filter(m -> m.metricName().name().equals(metricName))
+ .collect(Collectors.toList());
Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to:" + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
for (final Metric m : metrics) {
Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());