You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/04/20 18:01:31 UTC
[kafka] branch trunk updated: MINOR: Further reduce runtime for
metrics integration tests (#8514)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fcf45e1 MINOR: Further reduce runtime for metrics integration tests (#8514)
fcf45e1 is described below
commit fcf45e1fac88238b1d3dbcfa1f324674939706f3
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Mon Apr 20 11:00:58 2020 -0700
MINOR: Further reduce runtime for metrics integration tests (#8514)
1. In both RocksDBMetrics and Metrics integration tests, we do not need to wait for consumer to consume records from output topics since the sensors / metrics are registered upon task creation.
2. Merged the two test cases of RocksDB with one app that creates two state stores (non-segmented and segmented).
With these two changes, local runtime of these two tests reduced from 2min+ and 3min+ to under a minute.
Reviewers: Bruno Cadonna <br...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../integration/MetricsIntegrationTest.java | 53 ++------
.../integration/RocksDBMetricsIntegrationTest.java | 151 ++++++---------------
2 files changed, 57 insertions(+), 147 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 053ba53..f8389fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
@@ -264,66 +263,46 @@ public class MetricsIntegrationTest {
private void produceRecordsForTwoSegments(final Duration segmentInterval) throws Exception {
final MockTime mockTime = new MockTime(Math.max(segmentInterval.toMillis(), 60_000L));
+ final Properties props = TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties());
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
STREAM_INPUT,
Collections.singletonList(new KeyValue<>(1, "A")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
+ props,
mockTime.milliseconds()
);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
STREAM_INPUT,
Collections.singletonList(new KeyValue<>(1, "B")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
+ props,
mockTime.milliseconds()
);
}
private void produceRecordsForClosingWindow(final Duration windowSize) throws Exception {
final MockTime mockTime = new MockTime(windowSize.toMillis() + 1);
+ final Properties props = TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties());
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
STREAM_INPUT,
Collections.singletonList(new KeyValue<>(1, "A")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
+ props,
mockTime.milliseconds()
);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
STREAM_INPUT,
Collections.singletonList(new KeyValue<>(1, "B")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()),
+ props,
mockTime.milliseconds()
);
}
- private void waitUntilAllRecordsAreConsumed(final int numberOfExpectedRecords) throws Exception {
- IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- "consumerApp",
- LongDeserializer.class,
- LongDeserializer.class,
- new Properties()
- ),
- STREAM_OUTPUT_1,
- numberOfExpectedRecords
- );
- }
-
private void closeApplication() throws Exception {
kafkaStreams.close();
kafkaStreams.cleanUp();
@@ -423,8 +402,6 @@ public class MetricsIntegrationTest {
verifyStateMetric(State.RUNNING);
- waitUntilAllRecordsAreConsumed(1);
-
checkWindowStoreAndSuppressionBufferMetrics(builtInMetricsVersion);
closeApplication();
@@ -465,8 +442,6 @@ public class MetricsIntegrationTest {
verifyStateMetric(State.RUNNING);
- waitUntilAllRecordsAreConsumed(2);
-
checkSessionStoreMetrics(builtInMetricsVersion);
closeApplication();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index ed4758e..e9ac2f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -18,14 +18,12 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -73,8 +71,10 @@ public class RocksDBMetricsIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
- private static final String STREAM_INPUT = "STREAM_INPUT";
- private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
+ private static final String STREAM_INPUT_ONE = "STREAM_INPUT_ONE";
+ private static final String STREAM_OUTPUT_ONE = "STREAM_OUTPUT_ONE";
+ private static final String STREAM_INPUT_TWO = "STREAM_INPUT_TWO";
+ private static final String STREAM_OUTPUT_TWO = "STREAM_OUTPUT_TWO";
private static final String MY_STORE_PERSISTENT_KEY_VALUE = "myStorePersistentKeyValue";
private static final Duration WINDOW_SIZE = Duration.ofMillis(50);
private static final long TIMEOUT = 60000;
@@ -112,12 +112,13 @@ public class RocksDBMetricsIntegrationTest {
@Before
public void before() throws Exception {
- CLUSTER.createTopic(STREAM_INPUT, 1, 3);
+ CLUSTER.createTopic(STREAM_INPUT_ONE, 1, 3);
+ CLUSTER.createTopic(STREAM_INPUT_TWO, 1, 3);
}
@After
public void after() throws Exception {
- CLUSTER.deleteTopicsAndWait(STREAM_INPUT, STREAM_OUTPUT);
+ CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO, STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
}
@FunctionalInterface
@@ -126,19 +127,15 @@ public class RocksDBMetricsIntegrationTest {
}
@Test
- public void shouldExposeRocksDBMetricsForNonSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
+ public void shouldExposeRocksDBMetricsBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
final Properties streamsConfiguration = streamsConfig();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- final StreamsBuilder builder = builderForNonSegmentedStateStore();
- final String metricsScope = "rocksdb-state-id";
+ final StreamsBuilder builder = builderForStateStores();
cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
- IntegerDeserializer.class,
- StringDeserializer.class,
- this::verifyThatRocksDBMetricsAreExposed,
- metricsScope
+ this::verifyThatRocksDBMetricsAreExposed
);
// simulated failure
@@ -146,38 +143,7 @@ public class RocksDBMetricsIntegrationTest {
cleanUpStateRunVerifyAndClose(
builder,
streamsConfiguration,
- IntegerDeserializer.class,
- StringDeserializer.class,
- this::verifyThatRocksDBMetricsAreExposed,
- metricsScope
- );
- }
-
- @Test
- public void shouldExposeRocksDBMetricsForSegmentedStateStoreBeforeAndAfterFailureWithEmptyStateDir() throws Exception {
- final Properties streamsConfiguration = streamsConfig();
- IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- final StreamsBuilder builder = builderForSegmentedStateStore();
- final String metricsScope = "rocksdb-window-state-id";
-
- cleanUpStateRunVerifyAndClose(
- builder,
- streamsConfiguration,
- LongDeserializer.class,
- LongDeserializer.class,
- this::verifyThatRocksDBMetricsAreExposed,
- metricsScope
- );
-
- // simulated failure
-
- cleanUpStateRunVerifyAndClose(
- builder,
- streamsConfiguration,
- LongDeserializer.class,
- LongDeserializer.class,
- this::verifyThatRocksDBMetricsAreExposed,
- metricsScope
+ this::verifyThatRocksDBMetricsAreExposed
);
}
@@ -193,18 +159,14 @@ public class RocksDBMetricsIntegrationTest {
return streamsConfiguration;
}
- private StreamsBuilder builderForNonSegmentedStateStore() {
+ private StreamsBuilder builderForStateStores() {
final StreamsBuilder builder = new StreamsBuilder();
+ // create two state stores, one non-segmented and one segmented
builder.table(
- STREAM_INPUT,
+ STREAM_INPUT_ONE,
Materialized.as(Stores.persistentKeyValueStore(MY_STORE_PERSISTENT_KEY_VALUE)).withCachingEnabled()
- ).toStream().to(STREAM_OUTPUT);
- return builder;
- }
-
- private StreamsBuilder builderForSegmentedStateStore() {
- final StreamsBuilder builder = new StreamsBuilder();
- builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String()))
+ ).toStream().to(STREAM_OUTPUT_ONE);
+ builder.stream(STREAM_INPUT_TWO, Consumed.with(Serdes.Integer(), Serdes.String()))
.groupByKey()
.windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
.aggregate(() -> 0L,
@@ -214,70 +176,55 @@ public class RocksDBMetricsIntegrationTest {
.withRetention(WINDOW_SIZE))
.toStream()
.map((key, value) -> KeyValue.pair(value, value))
- .to(STREAM_OUTPUT, Produced.with(Serdes.Long(), Serdes.Long()));
+ .to(STREAM_OUTPUT_TWO, Produced.with(Serdes.Long(), Serdes.Long()));
return builder;
}
private void cleanUpStateRunVerifyAndClose(final StreamsBuilder builder,
final Properties streamsConfiguration,
- final Class outputKeyDeserializer,
- final Class outputValueDeserializer,
- final MetricsVerifier metricsVerifier,
- final String metricsScope) throws Exception {
+ final MetricsVerifier metricsVerifier) throws Exception {
final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
produceRecords();
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams, TIMEOUT);
- IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- "consumerApp",
- outputKeyDeserializer,
- outputValueDeserializer,
- new Properties()
- ),
- STREAM_OUTPUT,
- 1
- );
- metricsVerifier.verify(kafkaStreams, metricsScope);
+ metricsVerifier.verify(kafkaStreams, "rocksdb-state-id");
+ metricsVerifier.verify(kafkaStreams, "rocksdb-window-state-id");
kafkaStreams.close();
}
private void produceRecords() throws Exception {
final MockTime mockTime = new MockTime(WINDOW_SIZE.toMillis());
+ final Properties prop = TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class,
+ new Properties()
+ );
+ // non-segmented store do not need records with different timestamps
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- STREAM_INPUT,
- Collections.singletonList(new KeyValue<>(1, "A")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()
- ),
+ STREAM_INPUT_ONE,
+ Utils.mkSet(new KeyValue<>(1, "A"), new KeyValue<>(1, "B"), new KeyValue<>(1, "C")),
+ prop,
mockTime.milliseconds()
);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- STREAM_INPUT,
- Collections.singletonList(new KeyValue<>(1, "B")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()
- ),
+ STREAM_INPUT_TWO,
+ Collections.singleton(new KeyValue<>(1, "A")),
+ prop,
mockTime.milliseconds()
);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- STREAM_INPUT,
- Collections.singletonList(new KeyValue<>(1, "C")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties()
- ),
+ STREAM_INPUT_TWO,
+ Collections.singleton(new KeyValue<>(1, "B")),
+ prop,
+ mockTime.milliseconds()
+ );
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ STREAM_INPUT_TWO,
+ Collections.singleton(new KeyValue<>(1, "C")),
+ prop,
mockTime.milliseconds()
);
}
@@ -319,18 +266,6 @@ public class RocksDBMetricsIntegrationTest {
}
}
- private void verifyThatBytesWrittenTotalIncreases(final KafkaStreams kafkaStreams,
- final String metricsScope) throws InterruptedException {
- final List<Metric> metric = getRocksDBMetrics(kafkaStreams, metricsScope).stream()
- .filter(m -> BYTES_WRITTEN_TOTAL.equals(m.metricName().name()))
- .collect(Collectors.toList());
- TestUtils.waitForCondition(
- () -> (double) metric.get(0).metricValue() > 0,
- TIMEOUT,
- () -> "RocksDB metric bytes.written.total did not increase in " + TIMEOUT + " ms"
- );
- }
-
private List<Metric> getRocksDBMetrics(final KafkaStreams kafkaStreams,
final String metricsScope) {
return new ArrayList<Metric>(kafkaStreams.metrics().values()).stream()