You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/12/04 21:09:36 UTC

[kafka] branch 2.4 updated: MINOR: Convert Stream-StreamJoin Integration Test to TTD (#7752)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 3d1d651  MINOR: Convert Stream-StreamJoin Integration Test to TTD (#7752)
3d1d651 is described below

commit 3d1d651718ddee9614e4b59f5a9c9d365835957c
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 27 16:32:38 2019 -0500

    MINOR: Convert Stream-StreamJoin Integration Test to TTD (#7752)
    
    Convert StreamStreamJoinIntegrationTest to TTD for more stable testing.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>
---
 .../StreamStreamJoinIntegrationTest.java           | 384 ++++++++++-----------
 1 file changed, 192 insertions(+), 192 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 95c231e..4094fde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.junit.Before;
@@ -101,77 +101,77 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testInnerRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -179,84 +179,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                                  .selectKey(MockMapper.selectKeyKeyValueMapper()),
                        valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testLeft() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testLeftRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -264,84 +264,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                                      .selectKey(MockMapper.selectKeyKeyValueMapper()),
                         valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testOuter() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testOuterRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
             null,
             null,
             null,
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
             Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
         );
 
         leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -349,96 +349,96 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
                                 .selectKey(MockMapper.selectKeyKeyValueMapper()),
                         valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 
     @Test
     public void testMultiInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
 
-        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
-            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
-            Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
-            null,
-            null,
-            Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)),
-            Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
-            null,
-            null,
-            null,
-            Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)),
-            Arrays.asList(
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L),
-                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
+        final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-a", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-a", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-a-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-b", null, 6L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)),
+            null,
+            null,
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-b", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-a", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-b", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-a-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-c", null, 10L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)),
+            null,
+            null,
+            null,
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-a", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-b", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-c", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-a", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-b", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-c", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-a-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-a-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-b-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-b-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-a-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-b-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-c-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-c-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-c-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)),
+            Arrays.asList(
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-a-d", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-b-d", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-c-d", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
+                new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L))
         );
 
         leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10)))
                 .join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
-        runTest(expectedResult);
+        runTestWithDriver(expectedResult);
     }
 }