You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/06/12 21:11:00 UTC
[kafka] branch trunk updated: KAFKA-8262,
KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new df9ea61 KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)
df9ea61 is described below
commit df9ea618a397955806fc7f1fba7003492046c77a
Author: cadonna <br...@confluent.io>
AuthorDate: Wed Jun 12 23:10:39 2019 +0200
KAFKA-8262, KAFKA-8263: Fix flaky test `MetricsIntegrationTest` (#6922)
- Timeout occurred due to initial slow rebalancing.
- Added code to wait until `KafkaStreams` instance is in state RUNNING to check registration of metrics and in state NOT_RUNNING to check deregistration of metrics.
- I removed all other wait conditions, because they are not needed if `KafkaStreams` instance is in the right state.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../integration/MetricsIntegrationTest.java | 456 +++++++++------------
1 file changed, 205 insertions(+), 251 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index bc4d895..a3c5b87 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,9 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
@SuppressWarnings("unchecked")
@Category({IntegrationTest.class})
public class MetricsIntegrationTest {
@@ -187,27 +191,35 @@ public class MetricsIntegrationTest {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4);
}
- private void startApplication() {
+ private void startApplication() throws InterruptedException {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
+ final long timeout = 60000;
+ TestUtils.waitForCondition(
+ () -> kafkaStreams.state() == State.RUNNING,
+ timeout,
+ () -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms");
}
private void closeApplication() throws Exception {
kafkaStreams.close();
kafkaStreams.cleanUp();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+ final long timeout = 60000;
+ TestUtils.waitForCondition(
+ () -> kafkaStreams.state() == State.NOT_RUNNING,
+ timeout,
+ () -> "Kafka Streams application did not reach state NOT_RUNNING in " + timeout + " ms");
}
- private void checkMetricDeregistration() throws InterruptedException {
- TestUtils.waitForCondition(() -> {
- final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
- return listMetricAfterClosingApp.size() == 0;
- }, 10000, "de-registration of metrics");
+ private void checkMetricDeregistration() {
+ final List<Metric> listMetricAfterClosingApp = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().contains(STREAM_STRING)).collect(Collectors.toList());
+ assertThat(listMetricAfterClosingApp.size(), is(0));
}
@Test
public void testStreamMetric() throws Exception {
- final StringBuilder errorMessage = new StringBuilder();
stream = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
stream.to(STREAM_OUTPUT_1, Produced.with(Serdes.Integer(), Serdes.String()));
builder.table(STREAM_OUTPUT_1, Materialized.as(Stores.inMemoryKeyValueStore(MY_STORE_IN_MEMORY)).withCachingEnabled())
@@ -222,22 +234,13 @@ public class MetricsIntegrationTest {
startApplication();
- // metric level : Thread
- TestUtils.waitForCondition(() -> testThreadMetric(errorMessage), 10000, () -> "testThreadMetric -> " + errorMessage.toString());
-
- // metric level : Task
- TestUtils.waitForCondition(() -> testTaskMetric(errorMessage), 10000, () -> "testTaskMetric -> " + errorMessage.toString());
-
- // metric level : Processor
- TestUtils.waitForCondition(() -> testProcessorMetric(errorMessage), 10000, () -> "testProcessorMetric -> " + errorMessage.toString());
-
- // metric level : Store (in-memory-state, in-memory-lru-state, rocksdb-state)
- TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_STATE_METRICS + " -> " + errorMessage.toString());
- TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS + " -> " + errorMessage.toString());
- TestUtils.waitForCondition(() -> testStoreMetricKeyValueByType(STREAM_STORE_ROCKSDB_STATE_METRICS, errorMessage), 10000, () -> "testStoreMetricKeyValueByType:" + STREAM_STORE_ROCKSDB_STATE_METRICS + " -> " + errorMessage.toString());
-
- //metric level : Cache
- TestUtils.waitForCondition(() -> testCacheMetric(errorMessage), 10000, () -> "testCacheMetric -> " + errorMessage.toString());
+ checkThreadLevelMetrics();
+ checkTaskLevelMetrics();
+ checkProcessorLevelMetrics();
+ checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_STATE_METRICS);
+ checkKeyValueStoreMetricsByType(STREAM_STORE_IN_MEMORY_LRU_STATE_METRICS);
+ checkKeyValueStoreMetricsByType(STREAM_STORE_ROCKSDB_STATE_METRICS);
+ checkCacheMetrics();
closeApplication();
@@ -247,7 +250,6 @@ public class MetricsIntegrationTest {
@Test
public void testStreamMetricOfWindowStore() throws Exception {
- final StringBuilder errorMessage = new StringBuilder();
stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
groupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(50)))
@@ -257,8 +259,7 @@ public class MetricsIntegrationTest {
startApplication();
- // metric level : Store (window)
- TestUtils.waitForCondition(() -> testStoreMetricWindow(errorMessage), 10000, () -> "testStoreMetricWindow -> " + errorMessage.toString());
+ checkWindowStoreMetrics();
closeApplication();
@@ -268,7 +269,6 @@ public class MetricsIntegrationTest {
@Test
public void testStreamMetricOfSessionStore() throws Exception {
- final StringBuilder errorMessage = new StringBuilder();
stream2 = builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()));
final KGroupedStream<Integer, String> groupedStream = stream2.groupByKey();
groupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(50)))
@@ -278,8 +278,7 @@ public class MetricsIntegrationTest {
startApplication();
- // metric level : Store (session)
- TestUtils.waitForCondition(() -> testStoreMetricSession(errorMessage), 10000, () -> "testStoreMetricSession -> " + errorMessage.toString());
+ checkSessionStoreMetrics();
closeApplication();
@@ -287,240 +286,195 @@ public class MetricsIntegrationTest {
checkMetricDeregistration();
}
- private boolean testThreadMetric(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
- testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
- testMetricByName(listMetricThread, COMMIT_RATE, 1);
- testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
- testMetricByName(listMetricThread, POLL_RATE, 1);
- testMetricByName(listMetricThread, POLL_TOTAL, 1);
- testMetricByName(listMetricThread, PROCESS_RATE, 1);
- testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
- testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
- testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
- testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
- testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
- testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
- testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
- testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
- testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkTaskLevelMetrics() {
+ final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
+ testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
+ testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
+ testMetricByName(listMetricTask, COMMIT_RATE, 5);
+ testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
+ testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
+ testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
}
- private boolean testTaskMetric(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricTask = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_TASK_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricTask, COMMIT_LATENCY_AVG, 5);
- testMetricByName(listMetricTask, COMMIT_LATENCY_MAX, 5);
- testMetricByName(listMetricTask, COMMIT_RATE, 5);
- testMetricByName(listMetricTask, COMMIT_TOTAL, 5);
- testMetricByName(listMetricTask, RECORD_LATENESS_AVG, 4);
- testMetricByName(listMetricTask, RECORD_LATENESS_MAX, 4);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkThreadLevelMetrics() {
+ final List<Metric> listMetricThread = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_THREAD_NODE_METRICS)).collect(Collectors.toList());
+ testMetricByName(listMetricThread, COMMIT_LATENCY_AVG, 1);
+ testMetricByName(listMetricThread, COMMIT_LATENCY_MAX, 1);
+ testMetricByName(listMetricThread, POLL_LATENCY_AVG, 1);
+ testMetricByName(listMetricThread, POLL_LATENCY_MAX, 1);
+ testMetricByName(listMetricThread, PROCESS_LATENCY_AVG, 1);
+ testMetricByName(listMetricThread, PROCESS_LATENCY_MAX, 1);
+ testMetricByName(listMetricThread, PUNCTUATE_LATENCY_AVG, 1);
+ testMetricByName(listMetricThread, PUNCTUATE_LATENCY_MAX, 1);
+ testMetricByName(listMetricThread, COMMIT_RATE, 1);
+ testMetricByName(listMetricThread, COMMIT_TOTAL, 1);
+ testMetricByName(listMetricThread, POLL_RATE, 1);
+ testMetricByName(listMetricThread, POLL_TOTAL, 1);
+ testMetricByName(listMetricThread, PROCESS_RATE, 1);
+ testMetricByName(listMetricThread, PROCESS_TOTAL, 1);
+ testMetricByName(listMetricThread, PUNCTUATE_RATE, 1);
+ testMetricByName(listMetricThread, PUNCTUATE_TOTAL, 1);
+ testMetricByName(listMetricThread, TASK_CREATED_RATE, 1);
+ testMetricByName(listMetricThread, TASK_CREATED_TOTAL, 1);
+ testMetricByName(listMetricThread, TASK_CLOSED_RATE, 1);
+ testMetricByName(listMetricThread, TASK_CLOSED_TOTAL, 1);
+ testMetricByName(listMetricThread, SKIPPED_RECORDS_RATE, 1);
+ testMetricByName(listMetricThread, SKIPPED_RECORDS_TOTAL, 1);
}
- private boolean testProcessorMetric(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
- testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
- testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
- testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
- testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
- testMetricByName(listMetricProcessor, CREATE_RATE, 18);
- testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
- testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
- testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
- testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkProcessorLevelMetrics() {
+ final List<Metric> listMetricProcessor = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_PROCESSOR_NODE_METRICS)).collect(Collectors.toList());
+ testMetricByName(listMetricProcessor, PROCESS_LATENCY_AVG, 18);
+ testMetricByName(listMetricProcessor, PROCESS_LATENCY_MAX, 18);
+ testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_AVG, 18);
+ testMetricByName(listMetricProcessor, PUNCTUATE_LATENCY_MAX, 18);
+ testMetricByName(listMetricProcessor, CREATE_LATENCY_AVG, 18);
+ testMetricByName(listMetricProcessor, CREATE_LATENCY_MAX, 18);
+ testMetricByName(listMetricProcessor, DESTROY_LATENCY_AVG, 18);
+ testMetricByName(listMetricProcessor, DESTROY_LATENCY_MAX, 18);
+ testMetricByName(listMetricProcessor, PROCESS_RATE, 18);
+ testMetricByName(listMetricProcessor, PROCESS_TOTAL, 18);
+ testMetricByName(listMetricProcessor, PUNCTUATE_RATE, 18);
+ testMetricByName(listMetricProcessor, PUNCTUATE_TOTAL, 18);
+ testMetricByName(listMetricProcessor, CREATE_RATE, 18);
+ testMetricByName(listMetricProcessor, CREATE_TOTAL, 18);
+ testMetricByName(listMetricProcessor, DESTROY_RATE, 18);
+ testMetricByName(listMetricProcessor, DESTROY_TOTAL, 18);
+ testMetricByName(listMetricProcessor, FORWARD_TOTAL, 18);
}
- private boolean testStoreMetricWindow(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
- .collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
- testMetricByName(listMetricStore, GET_RATE, 0);
- testMetricByName(listMetricStore, DELETE_RATE, 0);
- testMetricByName(listMetricStore, DELETE_TOTAL, 0);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
- testMetricByName(listMetricStore, ALL_RATE, 0);
- testMetricByName(listMetricStore, ALL_TOTAL, 0);
- testMetricByName(listMetricStore, RANGE_RATE, 0);
- testMetricByName(listMetricStore, RANGE_TOTAL, 0);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkKeyValueStoreMetricsByType(final String storeType) {
+ final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(storeType))
+ .collect(Collectors.toList());
+ testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_RATE, 2);
+ testMetricByName(listMetricStore, PUT_TOTAL, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
+ testMetricByName(listMetricStore, GET_RATE, 2);
+ testMetricByName(listMetricStore, DELETE_RATE, 2);
+ testMetricByName(listMetricStore, DELETE_TOTAL, 2);
+ testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
+ testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
+ testMetricByName(listMetricStore, ALL_RATE, 2);
+ testMetricByName(listMetricStore, ALL_TOTAL, 2);
+ testMetricByName(listMetricStore, RANGE_RATE, 2);
+ testMetricByName(listMetricStore, RANGE_TOTAL, 2);
+ testMetricByName(listMetricStore, FLUSH_RATE, 2);
+ testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ testMetricByName(listMetricStore, RESTORE_RATE, 2);
+ testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
- private boolean testStoreMetricSession(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
- .collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
- testMetricByName(listMetricStore, GET_RATE, 0);
- testMetricByName(listMetricStore, DELETE_RATE, 0);
- testMetricByName(listMetricStore, DELETE_TOTAL, 0);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
- testMetricByName(listMetricStore, ALL_RATE, 0);
- testMetricByName(listMetricStore, ALL_TOTAL, 0);
- testMetricByName(listMetricStore, RANGE_RATE, 0);
- testMetricByName(listMetricStore, RANGE_TOTAL, 0);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkCacheMetrics() {
+ final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
+ testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
+ testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
+ testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
}
- private boolean testStoreMetricKeyValueByType(final String storeType, final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
- .filter(m -> m.metricName().group().equals(storeType))
- .collect(Collectors.toList());
- testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, GET_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, GET_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, ALL_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, ALL_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
- testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
- testMetricByName(listMetricStore, PUT_RATE, 2);
- testMetricByName(listMetricStore, PUT_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 2);
- testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 2);
- testMetricByName(listMetricStore, GET_RATE, 2);
- testMetricByName(listMetricStore, DELETE_RATE, 2);
- testMetricByName(listMetricStore, DELETE_TOTAL, 2);
- testMetricByName(listMetricStore, PUT_ALL_RATE, 2);
- testMetricByName(listMetricStore, PUT_ALL_TOTAL, 2);
- testMetricByName(listMetricStore, ALL_RATE, 2);
- testMetricByName(listMetricStore, ALL_TOTAL, 2);
- testMetricByName(listMetricStore, RANGE_RATE, 2);
- testMetricByName(listMetricStore, RANGE_TOTAL, 2);
- testMetricByName(listMetricStore, FLUSH_RATE, 2);
- testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
- testMetricByName(listMetricStore, RESTORE_RATE, 2);
- testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkWindowStoreMetrics() {
+ final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_STORE_WINDOW_ROCKSDB_STATE_METRICS))
+ .collect(Collectors.toList());
+ testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_RATE, 2);
+ testMetricByName(listMetricStore, PUT_TOTAL, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+ testMetricByName(listMetricStore, GET_RATE, 0);
+ testMetricByName(listMetricStore, DELETE_RATE, 0);
+ testMetricByName(listMetricStore, DELETE_TOTAL, 0);
+ testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+ testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+ testMetricByName(listMetricStore, ALL_RATE, 0);
+ testMetricByName(listMetricStore, ALL_TOTAL, 0);
+ testMetricByName(listMetricStore, RANGE_RATE, 0);
+ testMetricByName(listMetricStore, RANGE_TOTAL, 0);
+ testMetricByName(listMetricStore, FLUSH_RATE, 2);
+ testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ testMetricByName(listMetricStore, RESTORE_RATE, 2);
+ testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
- private boolean testCacheMetric(final StringBuilder errorMessage) {
- errorMessage.setLength(0);
- try {
- final List<Metric> listMetricCache = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream().filter(m -> m.metricName().group().equals(STREAM_CACHE_NODE_METRICS)).collect(Collectors.toList());
- testMetricByName(listMetricCache, HIT_RATIO_AVG, 6);
- testMetricByName(listMetricCache, HIT_RATIO_MIN, 6);
- testMetricByName(listMetricCache, HIT_RATIO_MAX, 6);
- return true;
- } catch (final Throwable e) {
- errorMessage.append(e.getMessage());
- return false;
- }
+ private void checkSessionStoreMetrics() {
+ final List<Metric> listMetricStore = new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()
+ .filter(m -> m.metricName().group().equals(STREAM_STORE_SESSION_ROCKSDB_STATE_METRICS))
+ .collect(Collectors.toList());
+ testMetricByName(listMetricStore, PUT_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, PUT_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, GET_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, GET_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, DELETE_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, DELETE_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, PUT_ALL_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, ALL_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, ALL_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, RANGE_LATENCY_AVG, 0);
+ testMetricByName(listMetricStore, RANGE_LATENCY_MAX, 0);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, FLUSH_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_AVG, 2);
+ testMetricByName(listMetricStore, RESTORE_LATENCY_MAX, 2);
+ testMetricByName(listMetricStore, PUT_RATE, 2);
+ testMetricByName(listMetricStore, PUT_TOTAL, 2);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_RATE, 0);
+ testMetricByName(listMetricStore, PUT_IF_ABSENT_TOTAL, 0);
+ testMetricByName(listMetricStore, GET_RATE, 0);
+ testMetricByName(listMetricStore, DELETE_RATE, 0);
+ testMetricByName(listMetricStore, DELETE_TOTAL, 0);
+ testMetricByName(listMetricStore, PUT_ALL_RATE, 0);
+ testMetricByName(listMetricStore, PUT_ALL_TOTAL, 0);
+ testMetricByName(listMetricStore, ALL_RATE, 0);
+ testMetricByName(listMetricStore, ALL_TOTAL, 0);
+ testMetricByName(listMetricStore, RANGE_RATE, 0);
+ testMetricByName(listMetricStore, RANGE_TOTAL, 0);
+ testMetricByName(listMetricStore, FLUSH_RATE, 2);
+ testMetricByName(listMetricStore, FLUSH_TOTAL, 2);
+ testMetricByName(listMetricStore, RESTORE_RATE, 2);
+ testMetricByName(listMetricStore, RESTORE_TOTAL, 2);
}
private void testMetricByName(final List<Metric> listMetric, final String metricName, final int numMetric) {