You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/12/04 21:09:36 UTC
[kafka] branch 2.4 updated: MINOR: Convert Stream-StreamJoin
Integration Test to TTD (#7752)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 3d1d651 MINOR: Convert Stream-StreamJoin Integration Test to TTD (#7752)
3d1d651 is described below
commit 3d1d651718ddee9614e4b59f5a9c9d365835957c
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Wed Nov 27 16:32:38 2019 -0500
MINOR: Convert Stream-StreamJoin Integration Test to TTD (#7752)
Convert StreamStreamJoinIntegrationTest to TTD for more stable testing.
Reviewers: Matthias J. Sax <mj...@apache.org>
---
.../StreamStreamJoinIntegrationTest.java | 384 ++++++++++-----------
1 file changed, 192 insertions(+), 192 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 95c231e..4094fde 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.junit.Before;
@@ -101,77 +101,77 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testInnerRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -179,84 +179,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testLeft() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testLeftRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -264,84 +264,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testOuter() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testOuterRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)),
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b", null, 9L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)),
null,
null,
null,
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d", null, 14L)),
Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 15L))
);
leftStream.map(MockMapper.noOpKeyValueMapper())
@@ -349,96 +349,96 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
.selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
@Test
public void testMultiInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
- final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
- Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
- Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
- null,
- null,
- Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)),
- Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
- null,
- null,
- null,
- Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)),
- Arrays.asList(
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L),
- new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
+ final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b-a", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b-a", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-a-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b-b", null, 6L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)),
+ null,
+ null,
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a-b", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b-a", null, 9L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b-b", null, 9L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c-a", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c-b", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c-a", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c-b", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c-a", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c-b", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-a-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c-c", null, 10L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d-a", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d-b", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d-c", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d-a", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d-b", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d-c", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d-a", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d-b", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d-c", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-a-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-a-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-b-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-b-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-a-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-b-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-c-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-c-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-c-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "A-d-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "B-d-d", null, 14L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "C-d-d", null, 14L)),
+ Arrays.asList(
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-a-d", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-b-d", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-c-d", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d-a", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d-b", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d-c", null, 15L),
+ new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10)))
.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
- runTest(expectedResult);
+ runTestWithDriver(expectedResult);
}
}