You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/12 21:11:00 UTC

[kafka] branch trunk updated: KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df9ea61  KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)
df9ea61 is described below

commit df9ea618a397955806fc7f1fba7003492046c77a
Author: cadonna <br...@confluent.io>
AuthorDate: Wed Jun 12 23:10:39 2019 +0200

    KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)
    
    - Timeout occurred due to initial slow rebalancing.
    - Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
    - I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../integration/MetricsIntegrationTest.java        | 456 +++++++++------------
 1 file changed, 205 insertions(+), 251 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index bc4d895..a3c5b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
 @SuppressWarnings("unchecked")
 @Category({IntegrationTest.class})
 public class MetricsIntegrationTest {
@@ -187,27 +191,35 @@ public class MetricsIntegrationTest {
         CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
     }
 
-    private void startApplication() {
+    private void startApplication() throws InterruptedException {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.start();
+        final long timeout = 60000;
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.RUNNING,
+            timeout,
+            () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
     }
 
     private void closeApplication() throws Exception {
         kafkaStreams.close();
         kafkaStreams.cleanUp();
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+        final long timeout = 60000;
+        TestUtils.waitForCondition(
+            () -> kafkaStreams.state() == State.NOT_RUNNING,
+            timeout,
+            () -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms");
     }
 
-    private void checkMetricDeregistration() throws InterruptedException {
-        TestUtils.waitForCondition(() -> {
-            final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
-            return listMetricAfterClosingApp.size() == 0;
-        }, 10000, "de-registration of metrics");
+    private void checkMetricDeregistration() {
+        final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
+        assertThat(listMetricAfterClosingApp.size(), is(0));
     }
 
     @Test
     public void testStreamMetric() throws Exception {
-        final StringBuilder errorMessage = new StringBuilder();
         stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
         builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
@@ -222,22 +234,13 @@ public class MetricsIntegrationTest {
 
         startApplication();
 
-        // metric level : Thread
-        TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 10000, () -> "testThreadMetric -> " + errorMessage.toString());
-
-        // metric level : Task
-        TestUtils.waitForCondition(() -> testTaskMetric(errorMessage), 10000, () -> "testTaskMetric -> " + errorMessage.toString());
-
-        // metric level : Processor
-        TestUtils.waitForCondition(() -> testProcessorMetric(errorMessage), 10000, () -> "testProcessorMetric -> " + errorMessage.toString());
-
-        // metric level : Store (in-memory-state, in-memory-lru-state, rocksdb-state)
-        TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_STATE_METRICS + " -> " + errorMessage.toString());
-        TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS + " -> " + errorMessage.toString());
-        TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_ROCKSDB_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_ROCKSDB_STATE_METRICS + " -> " + errorMessage.toString());
-
-        //metric level : Cache
-        TestUtils.waitForCondition(() -> testCacheMetric(errorMessage), 10000, () -> "testCacheMetric -> " + errorMessage.toString());
+        checkThreadLevelMetrics();
+        checkTaskLevelMetrics();
+        checkProcessorLevelMetrics();
+        checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_STATE_METRICS);
+        checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
+        checkKeyValueStoreMetricsByType(STREAM_STORE_ROCKSDB_STATE_METRICS);
+        checkCacheMetrics();
 
         closeApplication();
 
@@ -247,7 +250,6 @@ public class MetricsIntegrationTest {
 
     @Test
     public void testStreamMetricOfWindowStore() throws Exception {
-        final StringBuilder errorMessage = new StringBuilder();
         stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
         final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
         groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50)))
@@ -257,8 +259,7 @@ public class MetricsIntegrationTest {
 
         startApplication();
 
-        // metric level : Store (window)
-        TestUtils.waitForCondition(() -> testStoreMetricWindow(errorMessage), 10000, () -> "testStoreMetricWindow -> " + errorMessage.toString());
+        checkWindowStoreMetrics();
 
         closeApplication();
 
@@ -268,7 +269,6 @@ public class MetricsIntegrationTest {
 
     @Test
     public void testStreamMetricOfSessionStore() throws Exception {
-        final StringBuilder errorMessage = new StringBuilder();
         stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
         final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
         groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50)))
@@ -278,8 +278,7 @@ public class MetricsIntegrationTest {
 
         startApplication();
 
-        // metric level : Store (session)
-        TestUtils.waitForCondition(() -> testStoreMetricSession(errorMessage), 10000, () -> "testStoreMetricSession -> " + errorMessage.toString());
+        checkSessionStoreMetrics();
 
         closeApplication();
 
@@ -287,240 +286,195 @@ public class MetricsIntegrationTest {
         checkMetricDeregistration();
     }
 
-    private boolean testThreadMetric(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
-            testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
-            testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
-            testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
-            testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
-            testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
-            testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
-            testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
-            testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
-            testMetricByName(listMetricThread, COMMIT_RATE, 1);
-            testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
-            testMetricByName(listMetricThread, POLL_RATE, 1);
-            testMetricByName(listMetricThread, POLL_TOTAL, 1);
-            testMetricByName(listMetricThread, PROCESS_RATE, 1);
-            testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
-            testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
-            testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
-            testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
-            testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
-            testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
-            testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
-            testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
-            testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkTaskLevelMetrics() {
+        final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
+        testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
+        testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
+        testMetricByName(listMetricTask, COMMIT_RATE, 5);
+        testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
+        testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
+        testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
     }
 
-    private boolean testTaskMetric(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
-            testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
-            testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
-            testMetricByName(listMetricTask, COMMIT_RATE, 5);
-            testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
-            testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
-            testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkThreadLevelMetrics() {
+        final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
+        testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
+        testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
+        testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
+        testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
+        testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
+        testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
+        testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
+        testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
+        testMetricByName(listMetricThread, COMMIT_RATE, 1);
+        testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
+        testMetricByName(listMetricThread, POLL_RATE, 1);
+        testMetricByName(listMetricThread, POLL_TOTAL, 1);
+        testMetricByName(listMetricThread, PROCESS_RATE, 1);
+        testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
+        testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
+        testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
+        testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
+        testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
+        testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
+        testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
+        testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
+        testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
     }
 
-    private boolean testProcessorMetric(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
-            testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
-            testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
-            testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
-            testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
-            testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
-            testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
-            testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
-            testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
-            testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
-            testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
-            testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
-            testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
-            testMetricByName(listMetricProcessor, CREATE_RATE, 18);
-            testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
-            testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
-            testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
-            testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkProcessorLevelMetrics() {
+        final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
+        testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
+        testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
+        testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
+        testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
+        testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
+        testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
+        testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
+        testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
+        testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
+        testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
+        testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
+        testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
+        testMetricByName(listMetricProcessor, CREATE_RATE, 18);
+        testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
+        testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
+        testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
+        testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
     }
 
-    private boolean testStoreMetricWindow(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-                    .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
-                    .collect(Collectors.toList());
-            testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_RATE, 2);
-            testMetricByName(listMetricStore, PUT_TOTAL, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
-            testMetricByName(listMetricStore, GET_RATE, 0);
-            testMetricByName(listMetricStore, DELETE_RATE, 0);
-            testMetricByName(listMetricStore, DELETE_TOTAL, 0);
-            testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
-            testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
-            testMetricByName(listMetricStore, ALL_RATE, 0);
-            testMetricByName(listMetricStore, ALL_TOTAL, 0);
-            testMetricByName(listMetricStore, RANGE_RATE, 0);
-            testMetricByName(listMetricStore, RANGE_TOTAL, 0);
-            testMetricByName(listMetricStore, FLUSH_RATE, 2);
-            testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-            testMetricByName(listMetricStore, RESTORE_RATE, 2);
-            testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkKeyValueStoreMetricsByType(final String storeType) {
+        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(storeType))
+            .collect(Collectors.toList());
+        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_RATE, 2);
+        testMetricByName(listMetricStore, PUT_TOTAL, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
+        testMetricByName(listMetricStore, GET_RATE, 2);
+        testMetricByName(listMetricStore, DELETE_RATE, 2);
+        testMetricByName(listMetricStore, DELETE_TOTAL, 2);
+        testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
+        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
+        testMetricByName(listMetricStore, ALL_RATE, 2);
+        testMetricByName(listMetricStore, ALL_TOTAL, 2);
+        testMetricByName(listMetricStore, RANGE_RATE, 2);
+        testMetricByName(listMetricStore, RANGE_TOTAL, 2);
+        testMetricByName(listMetricStore, FLUSH_RATE, 2);
+        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        testMetricByName(listMetricStore, RESTORE_RATE, 2);
+        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
     }
 
-    private boolean testStoreMetricSession(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-                    .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
-                    .collect(Collectors.toList());
-            testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
-            testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_RATE, 2);
-            testMetricByName(listMetricStore, PUT_TOTAL, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
-            testMetricByName(listMetricStore, GET_RATE, 0);
-            testMetricByName(listMetricStore, DELETE_RATE, 0);
-            testMetricByName(listMetricStore, DELETE_TOTAL, 0);
-            testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
-            testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
-            testMetricByName(listMetricStore, ALL_RATE, 0);
-            testMetricByName(listMetricStore, ALL_TOTAL, 0);
-            testMetricByName(listMetricStore, RANGE_RATE, 0);
-            testMetricByName(listMetricStore, RANGE_TOTAL, 0);
-            testMetricByName(listMetricStore, FLUSH_RATE, 2);
-            testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-            testMetricByName(listMetricStore, RESTORE_RATE, 2);
-            testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkCacheMetrics() {
+        final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
+        testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
+        testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
+        testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
     }
 
-    private boolean testStoreMetricKeyValueByType(final String storeType, final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
-                    .filter(m -> m.metricName().group().equals(storeType))
-                    .collect(Collectors.toList());
-            testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
-            testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
-            testMetricByName(listMetricStore, PUT_RATE, 2);
-            testMetricByName(listMetricStore, PUT_TOTAL, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
-            testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
-            testMetricByName(listMetricStore, GET_RATE, 2);
-            testMetricByName(listMetricStore, DELETE_RATE, 2);
-            testMetricByName(listMetricStore, DELETE_TOTAL, 2);
-            testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
-            testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
-            testMetricByName(listMetricStore, ALL_RATE, 2);
-            testMetricByName(listMetricStore, ALL_TOTAL, 2);
-            testMetricByName(listMetricStore, RANGE_RATE, 2);
-            testMetricByName(listMetricStore, RANGE_TOTAL, 2);
-            testMetricByName(listMetricStore, FLUSH_RATE, 2);
-            testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
-            testMetricByName(listMetricStore, RESTORE_RATE, 2);
-            testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkWindowStoreMetrics() {
+        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
+            .collect(Collectors.toList());
+        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_RATE, 2);
+        testMetricByName(listMetricStore, PUT_TOTAL, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+        testMetricByName(listMetricStore, GET_RATE, 0);
+        testMetricByName(listMetricStore, DELETE_RATE, 0);
+        testMetricByName(listMetricStore, DELETE_TOTAL, 0);
+        testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+        testMetricByName(listMetricStore, ALL_RATE, 0);
+        testMetricByName(listMetricStore, ALL_TOTAL, 0);
+        testMetricByName(listMetricStore, RANGE_RATE, 0);
+        testMetricByName(listMetricStore, RANGE_TOTAL, 0);
+        testMetricByName(listMetricStore, FLUSH_RATE, 2);
+        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        testMetricByName(listMetricStore, RESTORE_RATE, 2);
+        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
     }
 
-    private boolean testCacheMetric(final StringBuilder errorMessage) {
-        errorMessage.setLength(0);
-        try {
-            final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
-            testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
-            testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
-            testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
-            return true;
-        } catch (final Throwable e) {
-            errorMessage.append(e.getMessage());
-            return false;
-        }
+    private void checkSessionStoreMetrics() {
+        final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+            .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
+            .collect(Collectors.toList());
+        testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+        testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+        testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+        testMetricByName(listMetricStore, PUT_RATE, 2);
+        testMetricByName(listMetricStore, PUT_TOTAL, 2);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+        testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+        testMetricByName(listMetricStore, GET_RATE, 0);
+        testMetricByName(listMetricStore, DELETE_RATE, 0);
+        testMetricByName(listMetricStore, DELETE_TOTAL, 0);
+        testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+        testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+        testMetricByName(listMetricStore, ALL_RATE, 0);
+        testMetricByName(listMetricStore, ALL_TOTAL, 0);
+        testMetricByName(listMetricStore, RANGE_RATE, 0);
+        testMetricByName(listMetricStore, RANGE_TOTAL, 0);
+        testMetricByName(listMetricStore, FLUSH_RATE, 2);
+        testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+        testMetricByName(listMetricStore, RESTORE_RATE, 2);
+        testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
     }
 
     private void testMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {