You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 21:39:28 UTC
[kafka] branch trunk updated: MINOR: Remove redundant
SuppressIntegrationTests (#5896)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new abc0959 MINOR: Remove redundant SuppressIntegrationTests (#5896)
abc0959 is described below
commit abc09597db86091c5273348e74be07b63c31a189
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Nov 15 15:39:18 2018 -0600
MINOR: Remove redundant SuppressIntegrationTests (#5896)
The removed tests have counterparts covered by SuppressScenarioTest using the TopologyTestDriver.
This will speed up the build and improve stability in the CPU-constrained Jenkins environment.
Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../integration/SuppressionIntegrationTest.java | 340 +--------------------
.../integration/utils/IntegrationTestUtils.java | 3 +-
2 files changed, 3 insertions(+), 340 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 14776b0..1991e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -16,14 +16,10 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
@@ -38,11 +34,7 @@ import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.ClassRule;
@@ -57,18 +49,16 @@ import java.util.Properties;
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
-import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
-import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -81,71 +71,9 @@ public class SuppressionIntegrationTest {
mkProperties(mkMap()),
0L
);
- private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
- private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final int COMMIT_INTERVAL = 100;
- private static final long TIMEOUT_MS = 30_000L;
-
- @Test
- public void shouldSuppressIntermediateEventsWithEmitAfter() {
- final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
- final String input = "input" + testId;
- final String outputSuppressed = "output-suppressed" + testId;
- final String outputRaw = "output-raw" + testId;
-
- cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
- valueCounts
- .suppress(untilTimeLimit(ofMillis(scaledTime(2L)), unbounded()))
- .toStream()
- .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
- valueCounts
- .toStream()
- .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
- final Properties streamsConfig = getStreamsConfig(appId);
- final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-
- try {
- produceSynchronously(
- input,
- asList(
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
- new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
- // this record is just here to advance stream time and flush the other records through the buffer
- new KeyValueTimestamp<>("tick", "tick", scaledTime(5L))
- )
- );
- verifyOutput(
- outputRaw,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("tick", 1L, scaledTime(5L))
- )
- );
- verifyOutput(
- outputSuppressed,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L))
- )
- );
- } finally {
- driver.close();
- cleanStateAfterTest(CLUSTER, driver);
- }
- }
private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
return builder
@@ -161,126 +89,6 @@ public class SuppressionIntegrationTest {
}
@Test
- public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
- final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
- final String input = "input" + testId;
- final String outputSuppressed = "output-suppressed" + testId;
- final String outputRaw = "output-raw" + testId;
-
- cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
- valueCounts
- .suppress(untilTimeLimit(Duration.ZERO, unbounded()))
- .toStream()
- .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
- valueCounts
- .toStream()
- .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
- final Properties streamsConfig = getStreamsConfig(appId);
- final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-
- try {
- produceSynchronously(
- input,
- asList(
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
- new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
- new KeyValueTimestamp<>("x", "x", scaledTime(4L))
- )
- );
- verifyOutput(
- outputRaw,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
- )
- );
- verifyOutput(
- outputSuppressed,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
- )
- );
- } finally {
- driver.close();
- cleanStateAfterTest(CLUSTER, driver);
- }
- }
-
- @Test
- public void shouldSuppressIntermediateEventsWithRecordLimit() {
- final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
- final String input = "input" + testId;
- final String outputSuppressed = "output-suppressed" + testId;
- final String outputRaw = "output-raw" + testId;
-
- cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
- valueCounts
- .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
- .toStream()
- .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
- valueCounts
- .toStream()
- .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
- final Properties streamsConfig = getStreamsConfig(appId);
- final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
- try {
- produceSynchronously(
- input,
- asList(
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
- new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
- new KeyValueTimestamp<>("x", "x", scaledTime(3L))
- )
- );
- verifyOutput(
- outputRaw,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
- )
- );
- verifyOutput(
- outputSuppressed,
- asList(
- // consecutive updates to v1 get suppressed into only the latter.
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
- )
- );
- } finally {
- driver.close();
- cleanStateAfterTest(CLUSTER, driver);
- }
- }
-
- @Test
public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -322,66 +130,6 @@ public class SuppressionIntegrationTest {
}
@Test
- public void shouldSuppressIntermediateEventsWithBytesLimit() {
- final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
- final String input = "input" + testId;
- final String outputSuppressed = "output-suppressed" + testId;
- final String outputRaw = "output-raw" + testId;
-
- cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
- valueCounts
- // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
- .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
- .toStream()
- .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
- valueCounts
- .toStream()
- .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
- final Properties streamsConfig = getStreamsConfig(appId);
- final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
- try {
- produceSynchronously(
- input,
- asList(
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
- new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
- new KeyValueTimestamp<>("x", "x", scaledTime(3L))
- )
- );
- verifyOutput(
- outputRaw,
- asList(
- new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
- new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
- )
- );
- verifyOutput(
- outputSuppressed,
- asList(
- // consecutive updates to v1 get suppressed into only the latter.
- new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
- new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
- new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
- )
- );
- } finally {
- driver.close();
- cleanStateAfterTest(CLUSTER, driver);
- }
- }
-
- @Test
public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -423,73 +171,6 @@ public class SuppressionIntegrationTest {
}
}
- @Test
- public void shouldSupportFinalResultsForTimeWindows() {
- final String testId = "-shouldSupportFinalResultsForTimeWindows";
- final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
- final String input = "input" + testId;
- final String outputSuppressed = "output-suppressed" + testId;
- final String outputRaw = "output-raw" + testId;
-
- cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
- final StreamsBuilder builder = new StreamsBuilder();
- final KTable<Windowed<String>, Long> valueCounts = builder
- .stream(input,
- Consumed.with(STRING_SERDE, STRING_SERDE)
- )
- .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE))
- .windowedBy(TimeWindows.of(ofMillis(scaledTime(2L))).grace(ofMillis(scaledTime(1L))))
- .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withLoggingDisabled());
-
- valueCounts
- .suppress(untilWindowCloses(unbounded()))
- .toStream()
- .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
- .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
- valueCounts
- .toStream()
- .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
- .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
- final Properties streamsConfig = getStreamsConfig(appId);
- final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
- try {
- produceSynchronously(input, asList(
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
- new KeyValueTimestamp<>("k1", "v1", scaledTime(2L)),
- new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
- new KeyValueTimestamp<>("k1", "v1", scaledTime(4L)),
- // note this event is dropped since it is out of the grace period
- new KeyValueTimestamp<>("k1", "v1", scaledTime(0L))
- ));
- verifyOutput(
- outputRaw,
- asList(
- new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 1L, scaledTime(0L)),
- new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 2L, scaledTime(1L)),
- new KeyValueTimestamp<>(scaledWindowKey("k1", 2L, 4L), 1L, scaledTime(2L)),
- new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 3L, scaledTime(1L)),
- new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)),
- new KeyValueTimestamp<>(scaledWindowKey("k1", 4L, 6L), 1L, scaledTime(4L))
- )
- );
-
- verifyOutput(
- outputSuppressed,
- singletonList(
- new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L))
- )
- );
- } finally {
- driver.close();
- cleanStateAfterTest(CLUSTER, driver);
- }
- }
-
private Properties getStreamsConfig(final String appId) {
return mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
@@ -501,10 +182,6 @@ public class SuppressionIntegrationTest {
));
}
- private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
- return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
- }
-
/**
* scaling to ensure that there are commits in between the various test events,
* just to exercise that everything works properly in the presence of commits.
@@ -524,20 +201,7 @@ public class SuppressionIntegrationTest {
}
private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
- waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down.");
+ waitForCondition(() -> !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down.");
assertThat(driver.state(), is(KafkaStreams.State.ERROR));
}
-
- private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
- final Properties properties = mkProperties(
- mkMap(
- mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
- mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
- mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
- mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
- )
- );
- IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
-
- }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8bca79f..8a2122d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -68,8 +68,7 @@ import java.util.stream.Collectors;
*/
public class IntegrationTestUtils {
- public static final long DEFAULT_TIMEOUT = 30 * 1000L;
- private static final long DEFAULT_COMMIT_INTERVAL = 100L;
+ public static final long DEFAULT_TIMEOUT = 60 * 1000L;
public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
/*