You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 21:39:28 UTC

[kafka] branch trunk updated: MINOR: Remove redundant SuppressIntegrationTests (#5896)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new abc0959  MINOR: Remove redundant SuppressIntegrationTests (#5896)
abc0959 is described below

commit abc09597db86091c5273348e74be07b63c31a189
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Thu Nov 15 15:39:18 2018 -0600

    MINOR: Remove redundant SuppressIntegrationTests (#5896)
    
    The removed tests have counterparts covered by SuppressScenarioTest using the TopologyTestDriver.
    
    This will speed up the build and improve stability in the CPU-constrained Jenkins environment.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../integration/SuppressionIntegrationTest.java    | 340 +--------------------
 .../integration/utils/IntegrationTestUtils.java    |   3 +-
 2 files changed, 3 insertions(+), 340 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 14776b0..1991e15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -16,14 +16,10 @@
  */
 package org.apache.kafka.streams.integration;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
@@ -38,11 +34,7 @@ import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.ClassRule;
@@ -57,18 +49,16 @@ import java.util.Properties;
 import static java.lang.Long.MAX_VALUE;
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
-import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
-import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -81,71 +71,9 @@ public class SuppressionIntegrationTest {
         mkProperties(mkMap()),
         0L
     );
-    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
     private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
     private static final Serde<String> STRING_SERDE = Serdes.String();
-    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
     private static final int COMMIT_INTERVAL = 100;
-    private static final long TIMEOUT_MS = 30_000L;
-
-    @Test
-    public void shouldSuppressIntermediateEventsWithEmitAfter() {
-        final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
-        final String input = "input" + testId;
-        final String outputSuppressed = "output-suppressed" + testId;
-        final String outputRaw = "output-raw" + testId;
-
-        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
-        valueCounts
-            .suppress(untilTimeLimit(ofMillis(scaledTime(2L)), unbounded()))
-            .toStream()
-            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        valueCounts
-            .toStream()
-            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        final Properties streamsConfig = getStreamsConfig(appId);
-        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-
-        try {
-            produceSynchronously(
-                input,
-                asList(
-                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
-                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
-                    // this record is just here to advance stream time and flush the other records through the buffer
-                    new KeyValueTimestamp<>("tick", "tick", scaledTime(5L))
-                )
-            );
-            verifyOutput(
-                outputRaw,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("tick", 1L, scaledTime(5L))
-                )
-            );
-            verifyOutput(
-                outputSuppressed,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L))
-                )
-            );
-        } finally {
-            driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
-        }
-    }
 
     private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
         return builder
@@ -161,126 +89,6 @@ public class SuppressionIntegrationTest {
     }
 
     @Test
-    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() {
-        final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
-        final String input = "input" + testId;
-        final String outputSuppressed = "output-suppressed" + testId;
-        final String outputRaw = "output-raw" + testId;
-
-        cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
-        valueCounts
-            .suppress(untilTimeLimit(Duration.ZERO, unbounded()))
-            .toStream()
-            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        valueCounts
-            .toStream()
-            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        final Properties streamsConfig = getStreamsConfig(appId);
-        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-
-        try {
-            produceSynchronously(
-                input,
-                asList(
-                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
-                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", "x", scaledTime(4L))
-                )
-            );
-            verifyOutput(
-                outputRaw,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
-                )
-            );
-            verifyOutput(
-                outputSuppressed,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
-                )
-            );
-        } finally {
-            driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
-        }
-    }
-
-    @Test
-    public void shouldSuppressIntermediateEventsWithRecordLimit() {
-        final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
-        final String input = "input" + testId;
-        final String outputSuppressed = "output-suppressed" + testId;
-        final String outputRaw = "output-raw" + testId;
-
-        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
-        valueCounts
-            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull()))
-            .toStream()
-            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        valueCounts
-            .toStream()
-            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        final Properties streamsConfig = getStreamsConfig(appId);
-        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-        try {
-            produceSynchronously(
-                input,
-                asList(
-                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
-                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
-                )
-            );
-            verifyOutput(
-                outputRaw,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
-                )
-            );
-            verifyOutput(
-                outputSuppressed,
-                asList(
-                    // consecutive updates to v1 get suppressed into only the latter.
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
-                )
-            );
-        } finally {
-            driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
-        }
-    }
-
-    @Test
     public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
         final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -322,66 +130,6 @@ public class SuppressionIntegrationTest {
     }
 
     @Test
-    public void shouldSuppressIntermediateEventsWithBytesLimit() {
-        final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
-        final String input = "input" + testId;
-        final String outputSuppressed = "output-suppressed" + testId;
-        final String outputRaw = "output-raw" + testId;
-
-        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
-
-        valueCounts
-            // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
-            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull()))
-            .toStream()
-            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        valueCounts
-            .toStream()
-            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        final Properties streamsConfig = getStreamsConfig(appId);
-        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-        try {
-            produceSynchronously(
-                input,
-                asList(
-                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
-                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
-                )
-            );
-            verifyOutput(
-                outputRaw,
-                asList(
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
-                )
-            );
-            verifyOutput(
-                outputSuppressed,
-                asList(
-                    // consecutive updates to v1 get suppressed into only the latter.
-                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
-                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
-                )
-            );
-        } finally {
-            driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
-        }
-    }
-
-    @Test
     public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
         final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
@@ -423,73 +171,6 @@ public class SuppressionIntegrationTest {
         }
     }
 
-    @Test
-    public void shouldSupportFinalResultsForTimeWindows() {
-        final String testId = "-shouldSupportFinalResultsForTimeWindows";
-        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
-        final String input = "input" + testId;
-        final String outputSuppressed = "output-suppressed" + testId;
-        final String outputRaw = "output-raw" + testId;
-
-        cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed);
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<Windowed<String>, Long> valueCounts = builder
-            .stream(input,
-                    Consumed.with(STRING_SERDE, STRING_SERDE)
-            )
-            .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(TimeWindows.of(ofMillis(scaledTime(2L))).grace(ofMillis(scaledTime(1L))))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withLoggingDisabled());
-
-        valueCounts
-            .suppress(untilWindowCloses(unbounded()))
-            .toStream()
-            .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
-            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        valueCounts
-            .toStream()
-            .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
-            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
-
-        final Properties streamsConfig = getStreamsConfig(appId);
-        final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true);
-        try {
-            produceSynchronously(input, asList(
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(2L)),
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(4L)),
-                // note this event is dropped since it is out of the grace period
-                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L))
-            ));
-            verifyOutput(
-                outputRaw,
-                asList(
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 1L, scaledTime(0L)),
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 2L, scaledTime(1L)),
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 2L, 4L), 1L, scaledTime(2L)),
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 3L, scaledTime(1L)),
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)),
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 4L, 6L), 1L, scaledTime(4L))
-                )
-            );
-
-            verifyOutput(
-                outputSuppressed,
-                singletonList(
-                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L))
-                )
-            );
-        } finally {
-            driver.close();
-            cleanStateAfterTest(CLUSTER, driver);
-        }
-    }
-
     private Properties getStreamsConfig(final String appId) {
         return mkProperties(mkMap(
             mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
@@ -501,10 +182,6 @@ public class SuppressionIntegrationTest {
         ));
     }
 
-    private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) {
-        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString();
-    }
-
     /**
      * scaling to ensure that there are commits in between the various test events,
      * just to exercise that everything works properly in the presence of commits.
@@ -524,20 +201,7 @@ public class SuppressionIntegrationTest {
     }
 
     private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
-        waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down.");
+        waitForCondition(() -> !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down.");
         assertThat(driver.state(), is(KafkaStreams.State.ERROR));
     }
-
-    private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
-        final Properties properties = mkProperties(
-            mkMap(
-                mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
-                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
-                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
-                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
-            )
-        );
-        IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps);
-
-    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8bca79f..8a2122d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -68,8 +68,7 @@ import java.util.stream.Collectors;
  */
 public class IntegrationTestUtils {
 
-    public static final long DEFAULT_TIMEOUT = 30 * 1000L;
-    private static final long DEFAULT_COMMIT_INTERVAL = 100L;
+    public static final long DEFAULT_TIMEOUT = 60 * 1000L;
     public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close";
 
     /*