You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/20 18:01:31 UTC

[kafka] branch trunk updated: MINOR: Further reduce runtime for metrics integration tests (#8514)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fcf45e1  MINOR: Further reduce runtime for metrics integration tests (#8514)
fcf45e1 is described below

commit fcf45e1fac88238b1d3dbcfa1f324674939706f3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Apr 20 11:00:58 2020 -0700

    MINOR: Further reduce runtime for metrics integration tests (#8514)
    
    1. In both RocksDBMetrics and Metrics integration tests, we do not need to wait for consumer to consume records from output topics since the sensors / metrics are registered upon task creation.
    
    2. Merged the two test cases of RocksDB with one app that creates two state stores (non-segmented and segmented).
    
    With these two changes, local runtime of these two tests reduced from 2min+ and 3min+ to under a minute.
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../integration/MetricsIntegrationTest.java        |  53 ++------
 .../integration/RocksDBMetricsIntegrationTest.java | 151 ++++++---------------
 2 files changed, 57 insertions(+), 147 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 053ba53..f8389fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -264,66 +263,46 @@ public class MetricsIntegrationTest {
 
     private void produceRecordsForTwoSegments(final Duration segmentInterval) throws Exception {
         final MockTime mockTime = new MockTime(Math.max(segmentInterval.toMillis(), 60_000L));
+        final Properties props = TestUtils.producerConfig(
+            CLUSTER.bootstrapServers(),
+            IntegerSerializer.class,
+            StringSerializer.class,
+            new Properties());
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             STREAM_INPUT,
             Collections.singletonList(new KeyValue<>(1, "A")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
+            props,
             mockTime.milliseconds()
         );
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             STREAM_INPUT,
             Collections.singletonList(new KeyValue<>(1, "B")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
+            props,
             mockTime.milliseconds()
         );
     }
 
     private void produceRecordsForClosingWindow(final Duration windowSize) throws Exception {
         final MockTime mockTime = new MockTime(windowSize.toMillis() + 1);
+        final Properties props = TestUtils.producerConfig(
+            CLUSTER.bootstrapServers(),
+            IntegerSerializer.class,
+            StringSerializer.class,
+            new Properties());
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             STREAM_INPUT,
             Collections.singletonList(new KeyValue<>(1, "A")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
+            props,
             mockTime.milliseconds()
         );
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             STREAM_INPUT,
             Collections.singletonList(new KeyValue<>(1, "B")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
+            props,
             mockTime.milliseconds()
         );
     }
 
-    private void waitUntilAllRecordsAreConsumed(final int numberOfExpectedRecords) throws Exception {
-        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            TestUtils.consumerConfig(
-                CLUSTER.bootstrapServers(),
-                "consumerApp",
-                LongDeserializer.class,
-                LongDeserializer.class,
-                new Properties()
-            ),
-            STREAM_OUTPUT_1,
-            numberOfExpectedRecords
-        );
-    }
-
     private void closeApplication() throws Exception {
         kafkaStreams.close();
         kafkaStreams.cleanUp();
@@ -423,8 +402,6 @@ public class MetricsIntegrationTest {
 
         verifyStateMetric(State.RUNNING);
 
-        waitUntilAllRecordsAreConsumed(1);
-
         checkWindowStoreAndSuppressionBufferMetrics(builtInMetricsVersion);
 
         closeApplication();
@@ -465,8 +442,6 @@ public class MetricsIntegrationTest {
 
         verifyStateMetric(State.RUNNING);
 
-        waitUntilAllRecordsAreConsumed(2);
-
         checkSessionStoreMetrics(builtInMetricsVersion);
 
         closeApplication();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index ed4758e..e9ac2f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -18,14 +18,12 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -73,8 +71,10 @@ public class RocksDBMetricsIntegrationTest {
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-    private static final String STREAM_INPUT = "STREAM_INPUT";
-    private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
+    private static final String STREAM_INPUT_ONE = "STREAM_INPUT_ONE";
+    private static final String STREAM_OUTPUT_ONE = "STREAM_OUTPUT_ONE";
+    private static final String STREAM_INPUT_TWO = "STREAM_INPUT_TWO";
+    private static final String STREAM_OUTPUT_TWO = "STREAM_OUTPUT_TWO";
     private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
     private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
     private static final long TIMEOUT = 60000;
@@ -112,12 +112,13 @@ public class RocksDBMetricsIntegrationTest {
 
     @Before
     public void before() throws Exception {
-        CLUSTER.createTopic(STREAM_INPUT, 1, 3);
+        CLUSTER.createTopic(STREAM_INPUT_ONE, 1, 3);
+        CLUSTER.createTopic(STREAM_INPUT_TWO, 1, 3);
     }
 
     @After
     public void after() throws Exception {
-        CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
+        CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
     }
 
     @FunctionalInterface
@@ -126,19 +127,15 @@ public class RocksDBMetricsIntegrationTest {
     }
 
     @Test
-    public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
+    public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
         final Properties streamsConfiguration = streamsConfig();
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-        final StreamsBuilder builder = builderForNonSegmentedStateStore();
-        final String metricsScope = "rocksdb-state-id";
+        final StreamsBuilder builder = builderForStateStores();
 
         cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
-            IntegerDeserializer.class,
-            StringDeserializer.class,
-            this::verifyThatRocksDBMetricsAreExposed,
-            metricsScope
+            this::verifyThatRocksDBMetricsAreExposed
         );
 
         // simulated failure
@@ -146,38 +143,7 @@ public class RocksDBMetricsIntegrationTest {
         cleanUpStateRunVerifyAndClose(
             builder,
             streamsConfiguration,
-            IntegerDeserializer.class,
-            StringDeserializer.class,
-            this::verifyThatRocksDBMetricsAreExposed,
-            metricsScope
-        );
-    }
-
-    @Test
-    public void shouldExposeRocksDBMetricsForSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
-        final Properties streamsConfiguration = streamsConfig();
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-        final StreamsBuilder builder = builderForSegmentedStateStore();
-        final String metricsScope = "rocksdb-window-state-id";
-
-        cleanUpStateRunVerifyAndClose(
-            builder,
-            streamsConfiguration,
-            LongDeserializer.class,
-            LongDeserializer.class,
-            this::verifyThatRocksDBMetricsAreExposed,
-            metricsScope
-        );
-
-        // simulated failure
-
-        cleanUpStateRunVerifyAndClose(
-            builder,
-            streamsConfiguration,
-            LongDeserializer.class,
-            LongDeserializer.class,
-            this::verifyThatRocksDBMetricsAreExposed,
-            metricsScope
+            this::verifyThatRocksDBMetricsAreExposed
         );
     }
 
@@ -193,18 +159,14 @@ public class RocksDBMetricsIntegrationTest {
         return streamsConfiguration;
     }
 
-    private StreamsBuilder builderForNonSegmentedStateStore() {
+    private StreamsBuilder builderForStateStores() {
         final StreamsBuilder builder = new StreamsBuilder();
+        // create two state stores, one non-segmented and one segmented
         builder.table(
-            STREAM_INPUT,
+            STREAM_INPUT_ONE,
             Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
-        ).toStream().to(STREAM_OUTPUT);
-        return builder;
-    }
-
-    private StreamsBuilder builderForSegmentedStateStore() {
-        final StreamsBuilder builder = new StreamsBuilder();
-        builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+        ).toStream().to(STREAM_OUTPUT_ONE);
+        builder.stream(STREAM_INPUT_TWO, Consumed.with(Serdes.Integer(), Serdes.String()))
             .groupByKey()
             .windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
             .aggregate(() -> 0L,
@@ -214,70 +176,55 @@ public class RocksDBMetricsIntegrationTest {
                     .withRetention(WINDOW_SIZE))
             .toStream()
             .map((key, value) -> KeyValue.pair(value, value))
-            .to(STREAM_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long()));
+            .to(STREAM_OUTPUT_TWO, Produced.with(Serdes.Long(), Serdes.Long()));
         return builder;
     }
 
     private void cleanUpStateRunVerifyAndClose(final StreamsBuilder builder,
                                                final Properties streamsConfiguration,
-                                               final Class outputKeyDeserializer,
-                                               final Class outputValueDeserializer,
-                                               final MetricsVerifier metricsVerifier,
-                                               final String metricsScope) throws Exception {
+                                               final MetricsVerifier metricsVerifier) throws Exception {
         final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         kafkaStreams.cleanUp();
         produceRecords();
 
         StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, TIMEOUT);
 
-        IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            TestUtils.consumerConfig(
-                CLUSTER.bootstrapServers(),
-                "consumerApp",
-                outputKeyDeserializer,
-                outputValueDeserializer,
-                new Properties()
-            ),
-            STREAM_OUTPUT,
-            1
-        );
-        metricsVerifier.verify(kafkaStreams, metricsScope);
+        metricsVerifier.verify(kafkaStreams, "rocksdb-state-id");
+        metricsVerifier.verify(kafkaStreams, "rocksdb-window-state-id");
         kafkaStreams.close();
     }
 
     private void produceRecords() throws Exception {
         final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
+        final Properties prop = TestUtils.producerConfig(
+            CLUSTER.bootstrapServers(),
+            IntegerSerializer.class,
+            StringSerializer.class,
+            new Properties()
+        );
+        // non-segmented store do not need records with different timestamps
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            STREAM_INPUT,
-            Collections.singletonList(new KeyValue<>(1, "A")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()
-            ),
+            STREAM_INPUT_ONE,
+            Utils.mkSet(new KeyValue<>(1, "A"), new KeyValue<>(1, "B"), new KeyValue<>(1, "C")),
+            prop,
             mockTime.milliseconds()
         );
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            STREAM_INPUT,
-            Collections.singletonList(new KeyValue<>(1, "B")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()
-            ),
+            STREAM_INPUT_TWO,
+            Collections.singleton(new KeyValue<>(1, "A")),
+            prop,
             mockTime.milliseconds()
         );
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
-            STREAM_INPUT,
-            Collections.singletonList(new KeyValue<>(1, "C")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()
-            ),
+            STREAM_INPUT_TWO,
+            Collections.singleton(new KeyValue<>(1, "B")),
+            prop,
+            mockTime.milliseconds()
+        );
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            STREAM_INPUT_TWO,
+            Collections.singleton(new KeyValue<>(1, "C")),
+            prop,
             mockTime.milliseconds()
         );
     }
@@ -319,18 +266,6 @@ public class RocksDBMetricsIntegrationTest {
         }
     }
 
-    private void verifyThatBytesWrittenTotalIncreases(final KafkaStreams kafkaStreams,
-                                                      final String metricsScope) throws InterruptedException {
-        final List<Metric> metric = getRocksDBMetrics(kafkaStreams, metricsScope).stream()
-            .filter(m -> BYTES_WRITTEN_TOTAL.equals(m.metricName().name()))
-            .collect(Collectors.toList());
-        TestUtils.waitForCondition(
-            () -> (double) metric.get(0).metricValue() > 0,
-            TIMEOUT,
-            () -> "RocksDB metric bytes.written.total did not increase in " + TIMEOUT + " ms"
-        );
-    }
-
     private List<Metric> getRocksDBMetrics(final KafkaStreams kafkaStreams,
                                            final String metricsScope) {
         return new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()