You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/11/25 16:17:42 UTC

[kafka] branch 2.4 updated: MINOR: Updated StreamTableJoinIntegrationTest to use TTD (#7722)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new ff12de5  MINOR: Updated StreamTableJoinIntegrationTest to use TTD (#7722)
ff12de5 is described below

commit ff12de576c64596b29320fe4998e1444e4b8d48b
Author: Bill Bejeck <bi...@confluent.io>
AuthorDate: Fri Nov 22 16:25:57 2019 -0500

    MINOR: Updated StreamTableJoinIntegrationTest to use TTD (#7722)
    
    Convert StreamTableJoinIntegrationTest to use the ToplogyTestDriver to eliminate flakiness and speed up the build.
    
    Reviewers: John Roesler <jo...@confluent.io>
---
 .../integration/AbstractJoinIntegrationTest.java   | 45 ++++++++++++++++++++++
 .../StreamTableJoinIntegrationTest.java            | 30 ++++++++-------
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 6660a20..e7b87a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -30,6 +30,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -37,6 +40,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -52,15 +56,18 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -186,6 +193,44 @@ public abstract class AbstractJoinIntegrationTest {
     }
 
 
+    void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult) {
+        runTestWithDriver(expectedResult, null);
+    }
+
+    void runTestWithDriver(final List<List<TestRecord<Long, String>>> expectedResult, final String storeName) {
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(STREAMS_CONFIG), STREAMS_CONFIG)) {
+            final TestInputTopic<Long, String> right = driver.createInputTopic(INPUT_TOPIC_RIGHT, new LongSerializer(), new StringSerializer());
+            final TestInputTopic<Long, String> left = driver.createInputTopic(INPUT_TOPIC_LEFT, new LongSerializer(), new StringSerializer());
+            final TestOutputTopic<Long, String> outputTopic = driver.createOutputTopic(OUTPUT_TOPIC, new LongDeserializer(), new StringDeserializer());
+            final Map<String, TestInputTopic<Long, String>> testInputTopicMap = new HashMap<>();
+
+            testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
+            testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
+
+            TestRecord<Long, String> expectedFinalResult = null;
+
+            final long firstTimestamp = System.currentTimeMillis();
+            long ts = firstTimestamp;
+            final Iterator<List<TestRecord<Long, String>>> resultIterator = expectedResult.iterator();
+            for (final Input<String> singleInputRecord : input) {
+                testInputTopicMap.get(singleInputRecord.topic).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++ts);
+
+                final List<TestRecord<Long, String>> expected = resultIterator.next();
+                if (expected != null) {
+                    final List<TestRecord<Long, String>> updatedExpected = new LinkedList<>();
+                    for (final TestRecord<Long, String> record : expected) {
+                        updatedExpected.add(new TestRecord<>(record.key(), record.value(), null, firstTimestamp + record.timestamp()));
+                    }
+
+                    final List<TestRecord<Long, String>> output = outputTopic.readRecordsToList();
+                    assertEquals(output, updatedExpected);
+                    expectedFinalResult = updatedExpected.get(expected.size() - 1);
+                }
+            }
+        }
+    }
+
+
     /*
      * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
      * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 4ddab57..66f0a04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -17,12 +17,12 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -64,6 +64,7 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
     @Test
     public void testShouldAutoShutdownOnIncompleteMetadata() throws InterruptedException {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-incomplete");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
         final KStream<Long, String> notExistStream = builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
 
@@ -86,15 +87,16 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
     }
 
     @Test
-    public void testInner() throws Exception {
+    public void testInner() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             null,
             null,
             null,
@@ -104,38 +106,38 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null,  15L))
         );
 
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
-    public void testLeft() throws Exception {
+    public void testLeft() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "topology_driver:0000");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)),
             null,
             null,
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 }