You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/06/13 21:57:48 UTC

[kafka] branch trunk updated: MINOR: Refactor `MetricsIntegrationTest` (#6930)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 48b65ac  MINOR: Refactor `MetricsIntegrationTest` (#6930)
48b65ac is described below

commit 48b65ac617a17cb5570e152256c1073619cfaabe
Author: cadonna <br...@confluent.io>
AuthorDate: Thu Jun 13 23:57:26 2019 +0200

    MINOR: Refactor `MetricsIntegrationTest` (#6930)
    
    Reviewers: Sophie Blee-Goldman <so...@confluent.io>,  Bill Bejeck <bb...@gmail.com>
---
 .../integration/MetricsIntegrationTest.java        | 433 ++++++++++-----------
 1 file changed, 216 insertions(+), 217 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index a3c5b87..0f727d3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -27,8 +27,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -61,8 +59,7 @@ public class MetricsIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-            new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     // Metric group
     private static final String STREAM_THREAD_NODE_METRICS = "stream-metrics";
@@ -156,10 +153,6 @@ public class MetricsIntegrationTest {
     private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
     private static final String MY_STORE_LRU_MAP = "myStoreLruMap";
 
-    private StreamsBuilder builder;
-    private Properties streamsConfiguration;
-    private KafkaStreams kafkaStreams;
-
     // topic names
     private static final String STREAM_INPUT = "STREAM_INPUT";
     private static final String STREAM_OUTPUT_1 = "STREAM_OUTPUT_1";
@@ -167,17 +160,16 @@ public class MetricsIntegrationTest {
     private static final String STREAM_OUTPUT_3 = "STREAM_OUTPUT_3";
     private static final String STREAM_OUTPUT_4 = "STREAM_OUTPUT_4";
 
-    private KStream<Integer, String> stream;
-    private KStream<Integer, String> stream2;
-
-    private final String appId = "stream-metrics-test";
+    private StreamsBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
 
     @Before
     public void before() throws InterruptedException {
         builder = new StreamsBuilder();
         CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
         streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-metrics-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
@@ -212,26 +204,22 @@ public class MetricsIntegrationTest {
             () -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms");
     }
 
-    private void checkMetricDeregistration() {
-        final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
-        assertThat(listMetricAfterClosingApp.size(), is(0));
-    }
-
     @Test
-    public void testStreamMetric() throws Exception {
-        stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
-        stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
-        builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
-                .toStream()
-                .to(STREAM_OUTPUT_2);
-        builder.table(STREAM_OUTPUT_2, Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled())
-                .toStream()
-                .to(STREAM_OUTPUT_3);
-        builder.table(STREAM_OUTPUT_3, Materialized.as(Stores.lruMap(MY_STORE_LRU_MAP, 10000)).withCachingEnabled())
-                .toStream()
-                .to(STREAM_OUTPUT_4);
-
+    public void shouldAddMetricsOnAllLevels() throws Exception {
+        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
+        builder.table(STREAM_OUTPUT_1,
+                      Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
+            .toStream()
+            .to(STREAM_OUTPUT_2);
+        builder.table(STREAM_OUTPUT_2,
+                      Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled())
+            .toStream()
+            .to(STREAM_OUTPUT_3);
+        builder.table(STREAM_OUTPUT_3,
+                      Materialized.as(Stores.lruMap(MY_STORE_LRU_MAP, 10000)).withCachingEnabled())
+            .toStream()
+            .to(STREAM_OUTPUT_4);
         startApplication();
 
         checkThreadLevelMetrics();
@@ -244,241 +232,252 @@ public class MetricsIntegrationTest {
 
         closeApplication();
 
-        // check all metrics de-registered
-        checkMetricDeregistration();
+        checkMetricsDeregistration();
     }
 
     @Test
-    public void testStreamMetricOfWindowStore() throws Exception {
-        stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
-        final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
-        groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50)))
-                .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue,
-                        Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
-                                .withValueSerde(Serdes.Long()));
-
+    public void shouldAddMetricsForWindowStore() throws Exception {
+        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .groupByKey()
+            .windowedBy(TimeWindows.of(Duration.ofMillis(50)))
+            .aggregate(() -> 0L,
+                (aggKey, newValue, aggValue) -> aggValue,
+                Materialized.<Integer, Long, WindowStore<Bytes, byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
+                    .withValueSerde(Serdes.Long()));
         startApplication();
 
         checkWindowStoreMetrics();
 
         closeApplication();
 
-        // check all metrics de-registered
-        checkMetricDeregistration();
+        checkMetricsDeregistration();
     }
 
     @Test
-    public void testStreamMetricOfSessionStore() throws Exception {
-        stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
-        final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
-        groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50)))
-                .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue,
-                        Materialized.<Integer, Long, SessionStore<Bytes, byte[]>>as(SESSION_AGGREGATED_STREAM_STORE)
-                                .withValueSerde(Serdes.Long()));
-
+    public void shouldAddMetricsForSessionStore() throws Exception {
+        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+            .groupByKey()
+            .windowedBy(SessionWindows.with(Duration.ofMillis(50)))
+            .aggregate(() -> 0L,
+                (aggKey, newValue, aggValue) -> aggValue,
+                (aggKey, leftAggValue, rightAggValue) -> leftAggValue,
+                Materialized.<Integer, Long, SessionStore<Bytes, byte[]>>as(SESSION_AGGREGATED_STREAM_STORE)
+                    .withValueSerde(Serdes.Long()));
         startApplication();
 
         checkSessionStoreMetrics();
 
         closeApplication();
 
-        // check all metrics de-registered
-        checkMetricDeregistration();
-    }
-
-    private void checkTaskLevelMetrics() {
-        final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
-        testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
-        testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
-        testMetricByName(listMetricTask, COMMIT_RATE, 5);
-        testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
-        testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
-        testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
+        checkMetricsDeregistration();
     }
 
     private void checkThreadLevelMetrics() {
         final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
-        testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
-        testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
-        testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
-        testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
-        testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
-        testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
-        testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
-        testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
-        testMetricByName(listMetricThread, COMMIT_RATE, 1);
-        testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
-        testMetricByName(listMetricThread, POLL_RATE, 1);
-        testMetricByName(listMetricThread, POLL_TOTAL, 1);
-        testMetricByName(listMetricThread, PROCESS_RATE, 1);
-        testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
-        testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
-        testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
-        testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
-        testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
-        testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
-        testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
-        testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
-        testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
+            .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS))
+            .collect(Collectors.toList());
+        checkMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
+        checkMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
+        checkMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
+        checkMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
+        checkMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
+        checkMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
+        checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
+        checkMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
+        checkMetricByName(listMetricThread, COMMIT_RATE, 1);
+        checkMetricByName(listMetricThread, COMMIT_TOTAL, 1);
+        checkMetricByName(listMetricThread, POLL_RATE, 1);
+        checkMetricByName(listMetricThread, POLL_TOTAL, 1);
+        checkMetricByName(listMetricThread, PROCESS_RATE, 1);
+        checkMetricByName(listMetricThread, PROCESS_TOTAL, 1);
+        checkMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
+        checkMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
+        checkMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
+        checkMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
+        checkMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
+        checkMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
+        checkMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
+        checkMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
+    }
+
+    private void checkTaskLevelMetrics() {
+        final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS))
+            .collect(Collectors.toList());
+        checkMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
+        checkMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
+        checkMetricByName(listMetricTask, COMMIT_RATE, 5);
+        checkMetricByName(listMetricTask, COMMIT_TOTAL, 5);
+        checkMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
+        checkMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
     }
 
     private void checkProcessorLevelMetrics() {
         final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
-        testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
-        testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
-        testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
-        testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
-        testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
-        testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
-        testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
-        testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
-        testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
-        testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
-        testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
-        testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
-        testMetricByName(listMetricProcessor, CREATE_RATE, 18);
-        testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
-        testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
-        testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
-        testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
+            .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS))
+            .collect(Collectors.toList());
+        checkMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
+        checkMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
+        checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
+        checkMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
+        checkMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
+        checkMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
+        checkMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
+        checkMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
+        checkMetricByName(listMetricProcessor, PROCESS_RATE, 18);
+        checkMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
+        checkMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
+        checkMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
+        checkMetricByName(listMetricProcessor, CREATE_RATE, 18);
+        checkMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
+        checkMetricByName(listMetricProcessor, DESTROY_RATE, 18);
+        checkMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
+        checkMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
     }
 
     private void checkKeyValueStoreMetricsByType(final String storeType) {
         final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().group().equals(storeType))
             .collect(Collectors.toList());
-        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_RATE, 2);
-        testMetricByName(listMetricStore, PUT_TOTAL, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
-        testMetricByName(listMetricStore, GET_RATE, 2);
-        testMetricByName(listMetricStore, DELETE_RATE, 2);
-        testMetricByName(listMetricStore, DELETE_TOTAL, 2);
-        testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
-        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
-        testMetricByName(listMetricStore, ALL_RATE, 2);
-        testMetricByName(listMetricStore, ALL_TOTAL, 2);
-        testMetricByName(listMetricStore, RANGE_RATE, 2);
-        testMetricByName(listMetricStore, RANGE_TOTAL, 2);
-        testMetricByName(listMetricStore, FLUSH_RATE, 2);
-        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-        testMetricByName(listMetricStore, RESTORE_RATE, 2);
-        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_RATE, 2);
+        checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
+        checkMetricByName(listMetricStore, GET_RATE, 2);
+        checkMetricByName(listMetricStore, DELETE_RATE, 2);
+        checkMetricByName(listMetricStore, DELETE_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_ALL_RATE, 2);
+        checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
+        checkMetricByName(listMetricStore, ALL_RATE, 2);
+        checkMetricByName(listMetricStore, ALL_TOTAL, 2);
+        checkMetricByName(listMetricStore, RANGE_RATE, 2);
+        checkMetricByName(listMetricStore, RANGE_TOTAL, 2);
+        checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+        checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+        checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+    }
+
+    private void checkMetricsDeregistration() {
+        final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().contains(STREAM_STRING))
+            .collect(Collectors.toList());
+        assertThat(listMetricAfterClosingApp.size(), is(0));
     }
 
     private void checkCacheMetrics() {
         final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-            .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
-        testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
-        testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
-        testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
+            .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS))
+            .collect(Collectors.toList());
+        checkMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
+        checkMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
+        checkMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
     }
 
     private void checkWindowStoreMetrics() {
         final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
             .collect(Collectors.toList());
-        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_RATE, 2);
-        testMetricByName(listMetricStore, PUT_TOTAL, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
-        testMetricByName(listMetricStore, GET_RATE, 0);
-        testMetricByName(listMetricStore, DELETE_RATE, 0);
-        testMetricByName(listMetricStore, DELETE_TOTAL, 0);
-        testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
-        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
-        testMetricByName(listMetricStore, ALL_RATE, 0);
-        testMetricByName(listMetricStore, ALL_TOTAL, 0);
-        testMetricByName(listMetricStore, RANGE_RATE, 0);
-        testMetricByName(listMetricStore, RANGE_TOTAL, 0);
-        testMetricByName(listMetricStore, FLUSH_RATE, 2);
-        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-        testMetricByName(listMetricStore, RESTORE_RATE, 2);
-        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_RATE, 2);
+        checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+        checkMetricByName(listMetricStore, GET_RATE, 0);
+        checkMetricByName(listMetricStore, DELETE_RATE, 0);
+        checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+        checkMetricByName(listMetricStore, ALL_RATE, 0);
+        checkMetricByName(listMetricStore, ALL_TOTAL, 0);
+        checkMetricByName(listMetricStore, RANGE_RATE, 0);
+        checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
+        checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+        checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+        checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
     }
 
     private void checkSessionStoreMetrics() {
         final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
             .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
             .collect(Collectors.toList());
-        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
-        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-        testMetricByName(listMetricStore, PUT_RATE, 2);
-        testMetricByName(listMetricStore, PUT_TOTAL, 2);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
-        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
-        testMetricByName(listMetricStore, GET_RATE, 0);
-        testMetricByName(listMetricStore, DELETE_RATE, 0);
-        testMetricByName(listMetricStore, DELETE_TOTAL, 0);
-        testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
-        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
-        testMetricByName(listMetricStore, ALL_RATE, 0);
-        testMetricByName(listMetricStore, ALL_TOTAL, 0);
-        testMetricByName(listMetricStore, RANGE_RATE, 0);
-        testMetricByName(listMetricStore, RANGE_TOTAL, 0);
-        testMetricByName(listMetricStore, FLUSH_RATE, 2);
-        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-        testMetricByName(listMetricStore, RESTORE_RATE, 2);
-        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+        checkMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        checkMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        checkMetricByName(listMetricStore, PUT_RATE, 2);
+        checkMetricByName(listMetricStore, PUT_TOTAL, 2);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+        checkMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+        checkMetricByName(listMetricStore, GET_RATE, 0);
+        checkMetricByName(listMetricStore, DELETE_RATE, 0);
+        checkMetricByName(listMetricStore, DELETE_TOTAL, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+        checkMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+        checkMetricByName(listMetricStore, ALL_RATE, 0);
+        checkMetricByName(listMetricStore, ALL_TOTAL, 0);
+        checkMetricByName(listMetricStore, RANGE_RATE, 0);
+        checkMetricByName(listMetricStore, RANGE_TOTAL, 0);
+        checkMetricByName(listMetricStore, FLUSH_RATE, 2);
+        checkMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        checkMetricByName(listMetricStore, RESTORE_RATE, 2);
+        checkMetricByName(listMetricStore, RESTORE_TOTAL, 2);
     }
 
-    private void testMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
-        final List<Metric> metrics = listMetric.stream().filter(m -> m.metricName().name().equals(metricName)).collect(Collectors.toList());
+    private void checkMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {
+        final List<Metric> metrics = listMetric.stream()
+            .filter(m -> m.metricName().name().equals(metricName))
+            .collect(Collectors.toList());
         Assert.assertEquals("Size of metrics of type:'" + metricName + "' must be equal to:" + numMetric + " but it's equal to " + metrics.size(), numMetric, metrics.size());
         for (final Metric m : metrics) {
             Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not null", m.metricValue());