You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/30 16:46:25 UTC
[kafka] branch trunk updated: KAFKA-6455: Update integration tests
to verify result timestamps (#6751)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 77e6e8e KAFKA-6455: Update integration tests to verify result timestamps (#6751)
77e6e8e is described below
commit 77e6e8ec054608a30626271b4952b63294a93c3b
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu May 30 09:46:12 2019 -0700
KAFKA-6455: Update integration tests to verify result timestamps (#6751)
Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
.../streams/kstream/internals/KTableReduce.java | 3 +-
.../integration/AbstractJoinIntegrationTest.java | 68 +--
.../integration/AbstractResetIntegrationTest.java | 36 +-
.../FineGrainedAutoResetIntegrationTest.java | 7 +-
.../integration/GlobalKTableIntegrationTest.java | 113 +++--
.../KStreamAggregationDedupIntegrationTest.java | 63 ++-
.../KStreamAggregationIntegrationTest.java | 227 +++++-----
.../StreamStreamJoinIntegrationTest.java | 349 ++++++++++-----
.../StreamTableJoinIntegrationTest.java | 72 ++--
.../integration/TableTableJoinIntegrationTest.java | 479 ++++++++++++---------
.../integration/utils/IntegrationTestUtils.java | 91 +++-
.../apache/kafka/test/MockProcessorSupplier.java | 6 +-
12 files changed, 922 insertions(+), 592 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 1b1a2bf..171912f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -80,7 +80,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
final V oldAgg = getValueOrNull(oldAggAndTimestamp);
final V intermediateAgg;
- long newTimestamp = context().timestamp();
+ long newTimestamp;
// first try to remove the old value
if (value.oldValue != null && oldAgg != null) {
@@ -88,6 +88,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
} else {
intermediateAgg = oldAgg;
+ newTimestamp = context().timestamp();
}
// then try to add the new value
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 7fa108d..daea1d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
@@ -51,6 +53,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,7 +89,7 @@ public abstract class AbstractJoinIntegrationTest {
static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
static final String OUTPUT_TOPIC = "outputTopic";
- private final long anyUniqueKey = 0L;
+ static final long ANY_UNIQUE_KEY = 0L;
private final static Properties PRODUCER_CONFIG = new Properties();
private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
@@ -163,13 +166,13 @@ public abstract class AbstractJoinIntegrationTest {
CLUSTER.deleteAllTopicsAndWait(120000);
}
- private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
- final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
- assertThat(result, is(expectedResult));
+ private void checkResult(final String outputTopic, final List<KeyValueTimestamp<Long, String>> expectedResult) throws InterruptedException {
+ IntegrationTestUtils.verifyKeyValueTimestamps(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult);
}
- private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
- final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
+ private void checkResult(final String outputTopic, final KeyValueTimestamp<Long, String> expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
+ final List<KeyValueTimestamp<Long, String>> result =
+ IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
assertThat(result.get(result.size() - 1), is(expectedFinalResult));
}
@@ -177,7 +180,7 @@ public abstract class AbstractJoinIntegrationTest {
* Runs the actual test. Checks the result after each input record to ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
*/
- void runTest(final List<List<String>> expectedResult) throws Exception {
+ void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult) throws Exception {
runTest(expectedResult, null);
}
@@ -186,28 +189,34 @@ public abstract class AbstractJoinIntegrationTest {
* Runs the actual test. Checks the result after each input record to ensure fixed processing order.
* If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
*/
- void runTest(final List<List<String>> expectedResult, final String storeName) throws Exception {
+ void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult, final String storeName) throws Exception {
assert expectedResult.size() == input.size();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
- String expectedFinalResult = null;
+ KeyValueTimestamp<Long, String> expectedFinalResult = null;
try {
streams.start();
- long ts = System.currentTimeMillis();
+ final long firstTimestamp = System.currentTimeMillis();
+ long ts = firstTimestamp;
- final Iterator<List<String>> resultIterator = expectedResult.iterator();
+ final Iterator<List<KeyValueTimestamp<Long, String>>> resultIterator = expectedResult.iterator();
for (final Input<String> singleInput : input) {
producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
- final List<String> expected = resultIterator.next();
+ final List<KeyValueTimestamp<Long, String>> expected = resultIterator.next();
if (expected != null) {
- checkResult(OUTPUT_TOPIC, expected);
- expectedFinalResult = expected.get(expected.size() - 1);
+ final List<KeyValueTimestamp<Long, String>> updatedExpected = new LinkedList<>();
+ for (final KeyValueTimestamp<Long, String> record : expected) {
+ updatedExpected.add(new KeyValueTimestamp<>(record.key(), record.value(), firstTimestamp + record.timestamp()));
+ }
+
+ checkResult(OUTPUT_TOPIC, updatedExpected);
+ expectedFinalResult = updatedExpected.get(expected.size() - 1);
}
}
@@ -222,21 +231,22 @@ public abstract class AbstractJoinIntegrationTest {
/*
* Runs the actual test. Checks the final result only after expected number of records have been consumed.
*/
- void runTest(final String expectedFinalResult) throws Exception {
+ void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult) throws Exception {
runTest(expectedFinalResult, null);
}
/*
* Runs the actual test. Checks the final result only after expected number of records have been consumed.
*/
- void runTest(final String expectedFinalResult, final String storeName) throws Exception {
+ void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult, final String storeName) throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
try {
streams.start();
- long ts = System.currentTimeMillis();
+ final long firstTimestamp = System.currentTimeMillis();
+ long ts = firstTimestamp;
for (final Input<String> singleInput : input) {
producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
@@ -244,10 +254,15 @@ public abstract class AbstractJoinIntegrationTest {
TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
- checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
+ final KeyValueTimestamp<Long, String> updatedExpectedFinalResult =
+ new KeyValueTimestamp<>(
+ expectedFinalResult.key(),
+ expectedFinalResult.value(),
+ firstTimestamp + expectedFinalResult.timestamp());
+ checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected);
if (storeName != null) {
- checkQueryableStore(storeName, expectedFinalResult);
+ checkQueryableStore(storeName, updatedExpectedFinalResult);
}
} finally {
streams.close();
@@ -257,15 +272,16 @@ public abstract class AbstractJoinIntegrationTest {
/*
* Checks the embedded queryable state store snapshot
*/
- private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
- final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.keyValueStore());
+ private void checkQueryableStore(final String queryableName, final KeyValueTimestamp<Long, String> expectedFinalResult) {
+ final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = streams.store(queryableName, QueryableStoreTypes.timestampedKeyValueStore());
- final KeyValueIterator<Long, String> all = store.all();
- final KeyValue<Long, String> onlyEntry = all.next();
+ final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all();
+ final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
try {
- assertThat(onlyEntry.key, is(anyUniqueKey));
- assertThat(onlyEntry.value, is(expectedFinalResult));
+ assertThat(onlyEntry.key, is(expectedFinalResult.key()));
+ assertThat(onlyEntry.value.value(), is(expectedFinalResult.value()));
+ assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp()));
assertThat(all.hasNext(), is(false));
} finally {
all.close();
@@ -278,7 +294,7 @@ public abstract class AbstractJoinIntegrationTest {
Input(final String topic, final V value) {
this.topic = topic;
- record = KeyValue.pair(anyUniqueKey, value);
+ record = KeyValue.pair(ANY_UNIQUE_KEY, value);
}
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index c9ae1bb..000f299 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.integration;
-import java.time.Duration;
+import kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@@ -39,10 +39,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
@@ -56,6 +54,7 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.text.SimpleDateFormat;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -65,8 +64,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
-import kafka.tools.StreamsResetter;
-
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -223,7 +220,7 @@ public abstract class AbstractResetIntegrationTest {
}
}
- void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+ void shouldNotAllowToResetWhileStreamsIsRunning() {
appID = testId + "-not-reset-during-runtime";
final String[] parameters = new String[] {
"--application-id", appID,
@@ -390,7 +387,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
- writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -434,7 +430,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
- writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -482,7 +477,6 @@ public abstract class AbstractResetIntegrationTest {
final File resetFile = File.createTempFile("reset", ".csv");
try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
writer.write(INPUT_TOPIC + ",0,1");
- writer.close();
}
streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -514,12 +508,7 @@ public abstract class AbstractResetIntegrationTest {
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
// use map to trigger internal re-partitioning before groupByKey
- input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
- @Override
- public KeyValue<Long, String> apply(final Long key, final String value) {
- return new KeyValue<>(key, value);
- }
- })
+ input.map(KeyValue::new)
.groupByKey()
.count()
.toStream()
@@ -530,12 +519,7 @@ public abstract class AbstractResetIntegrationTest {
.windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
.count()
.toStream()
- .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() {
- @Override
- public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
- return new KeyValue<>(key.window().start() + key.window().end(), value);
- }
- })
+ .map((key, value) -> new KeyValue<>(key.window().start() + key.window().end(), value))
.to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long()));
return builder.build();
@@ -547,12 +531,8 @@ public abstract class AbstractResetIntegrationTest {
final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
// use map to trigger internal re-partitioning before groupByKey
- input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
- @Override
- public KeyValue<Long, Long> apply(final Long key, final String value) {
- return new KeyValue<>(key, key);
- }
- }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
+ input.map((key, value) -> new KeyValue<>(key, key))
+ .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
return builder.build();
}
@@ -590,7 +570,7 @@ public abstract class AbstractResetIntegrationTest {
parameterList.add(resetScenarioArg);
}
- final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+ final String[] parameters = parameterList.toArray(new String[0]);
final Properties cleanUpConfig = new Properties();
cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 87d6a16..45ea71c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
+import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -55,8 +56,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
-import kafka.utils.MockTime;
-
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -191,8 +190,8 @@ public class FineGrainedAutoResetIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.EARLIEST));
- final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.LATEST));
+ final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+ final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.LATEST));
final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ));
pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index d3e0d24..0a9148d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -39,7 +38,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -68,7 +69,6 @@ public class GlobalKTableIntegrationTest {
private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
private final String globalStore = "globalStore";
- private final Map<String, String> results = new HashMap<>();
private StreamsBuilder builder;
private Properties streamsConfiguration;
private KafkaStreams kafkaStreams;
@@ -76,7 +76,7 @@ public class GlobalKTableIntegrationTest {
private String streamTopic;
private GlobalKTable<Long, String> globalTable;
private KStream<String, Long> stream;
- private ForeachAction<String, String> foreachAction;
+ private MockProcessorSupplier<String, String> supplier;
@Before
public void before() throws Exception {
@@ -96,7 +96,7 @@ public class GlobalKTableIntegrationTest {
.withValueSerde(Serdes.String()));
final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
stream = builder.stream(streamTopic, stringLongConsumed);
- foreachAction = results::put;
+ supplier = new MockProcessorSupplier<>();
}
@After
@@ -110,24 +110,34 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
- streamTableJoin.foreach(foreachAction);
+ streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
startStreams();
+ long firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
- final Map<String, String> expected = new HashMap<>();
- expected.put("a", "1+A");
- expected.put("b", "2+B");
- expected.put("c", "3+C");
- expected.put("d", "4+D");
- expected.put("e", "5+null");
+ final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
+ expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
+ expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
+ expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
+ expected.put("e", ValueAndTimestamp.make("5+null", firstTimestamp + 4L));
TestUtils.waitForCondition(
- () -> results.equals(expected),
+ () -> {
+ if (supplier.capturedProcessorsCount() < 2) {
+ return false;
+ }
+ final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+ result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+ result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+ return result.equals(expected);
+ },
30000L,
"waiting for initial values");
+ firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@@ -138,16 +148,29 @@ public class GlobalKTableIntegrationTest {
30000,
"waiting for data in replicated store");
+ final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
+ kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+ assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
+
+ firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
- expected.put("a", "1+F");
- expected.put("b", "2+G");
- expected.put("c", "3+H");
- expected.put("d", "4+I");
- expected.put("e", "5+J");
+ expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
+ expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
+ expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
+ expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
+ expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
TestUtils.waitForCondition(
- () -> results.equals(expected),
+ () -> {
+ if (supplier.capturedProcessorsCount() < 2) {
+ return false;
+ }
+ final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+ result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+ result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+ return result.equals(expected);
+ },
30000L,
"waiting for final values");
}
@@ -155,23 +178,33 @@ public class GlobalKTableIntegrationTest {
@Test
public void shouldKStreamGlobalKTableJoin() throws Exception {
final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
- streamTableJoin.foreach(foreachAction);
+ streamTableJoin.process(supplier);
produceInitialGlobalTableValues();
startStreams();
+ long firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
- final Map<String, String> expected = new HashMap<>();
- expected.put("a", "1+A");
- expected.put("b", "2+B");
- expected.put("c", "3+C");
- expected.put("d", "4+D");
+ final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+ expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
+ expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
+ expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
+ expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
TestUtils.waitForCondition(
- () -> results.equals(expected),
+ () -> {
+ if (supplier.capturedProcessorsCount() < 2) {
+ return false;
+ }
+ final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+ result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+ result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+ return result.equals(expected);
+ },
30000L,
"waiting for initial values");
+ firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();
final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@@ -182,16 +215,29 @@ public class GlobalKTableIntegrationTest {
30000,
"waiting for data in replicated store");
+ final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
+ kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+ assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
+
+ firstTimestamp = mockTime.milliseconds();
produceTopicValues(streamTopic);
- expected.put("a", "1+F");
- expected.put("b", "2+G");
- expected.put("c", "3+H");
- expected.put("d", "4+I");
- expected.put("e", "5+J");
+ expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
+ expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
+ expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
+ expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
+ expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
TestUtils.waitForCondition(
- () -> results.equals(expected),
+ () -> {
+ if (supplier.capturedProcessorsCount() < 2) {
+ return false;
+ }
+ final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+ result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+ result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+ return result.equals(expected);
+ },
30000L,
"waiting for final values");
}
@@ -209,11 +255,16 @@ public class GlobalKTableIntegrationTest {
startStreams();
ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
assertThat(store.approximateNumEntries(), equalTo(4L));
+ ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore =
+ kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+ assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
kafkaStreams.close();
startStreams();
store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
assertThat(store.approximateNumEntries(), equalTo(4L));
+ timestampedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+ assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}
private void createTopics() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 14346cf..61f6356 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -66,8 +67,7 @@ public class KStreamAggregationDedupIntegrationTest {
private static final long COMMIT_INTERVAL_MS = 300L;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private final MockTime mockTime = CLUSTER.time;
private static volatile AtomicInteger testNo = new AtomicInteger(0);
@@ -119,17 +119,18 @@ public class KStreamAggregationDedupIntegrationTest {
startStreams();
- produceMessages(System.currentTimeMillis());
+ final long timestamp = System.currentTimeMillis();
+ produceMessages(timestamp);
validateReceivedMessages(
new StringDeserializer(),
new StringDeserializer(),
Arrays.asList(
- KeyValue.pair("A", "A:A"),
- KeyValue.pair("B", "B:B"),
- KeyValue.pair("C", "C:C"),
- KeyValue.pair("D", "D:D"),
- KeyValue.pair("E", "E:E")));
+ new KeyValueTimestamp<>("A", "A:A", timestamp),
+ new KeyValueTimestamp<>("B", "B:B", timestamp),
+ new KeyValueTimestamp<>("C", "C:C", timestamp),
+ new KeyValueTimestamp<>("D", "D:D", timestamp),
+ new KeyValueTimestamp<>("E", "E:E", timestamp)));
}
@Test
@@ -155,16 +156,16 @@ public class KStreamAggregationDedupIntegrationTest {
new StringDeserializer(),
new StringDeserializer(),
Arrays.asList(
- new KeyValue<>("A@" + firstBatchWindow, "A"),
- new KeyValue<>("A@" + secondBatchWindow, "A:A"),
- new KeyValue<>("B@" + firstBatchWindow, "B"),
- new KeyValue<>("B@" + secondBatchWindow, "B:B"),
- new KeyValue<>("C@" + firstBatchWindow, "C"),
- new KeyValue<>("C@" + secondBatchWindow, "C:C"),
- new KeyValue<>("D@" + firstBatchWindow, "D"),
- new KeyValue<>("D@" + secondBatchWindow, "D:D"),
- new KeyValue<>("E@" + firstBatchWindow, "E"),
- new KeyValue<>("E@" + secondBatchWindow, "E:E")
+ new KeyValueTimestamp<>("A@" + firstBatchWindow, "A", firstBatchTimestamp),
+ new KeyValueTimestamp<>("A@" + secondBatchWindow, "A:A", secondBatchTimestamp),
+ new KeyValueTimestamp<>("B@" + firstBatchWindow, "B", firstBatchTimestamp),
+ new KeyValueTimestamp<>("B@" + secondBatchWindow, "B:B", secondBatchTimestamp),
+ new KeyValueTimestamp<>("C@" + firstBatchWindow, "C", firstBatchTimestamp),
+ new KeyValueTimestamp<>("C@" + secondBatchWindow, "C:C", secondBatchTimestamp),
+ new KeyValueTimestamp<>("D@" + firstBatchWindow, "D", firstBatchTimestamp),
+ new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp),
+ new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp),
+ new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)
)
);
}
@@ -189,11 +190,11 @@ public class KStreamAggregationDedupIntegrationTest {
new StringDeserializer(),
new LongDeserializer(),
Arrays.asList(
- KeyValue.pair("1@" + window, 2L),
- KeyValue.pair("2@" + window, 2L),
- KeyValue.pair("3@" + window, 2L),
- KeyValue.pair("4@" + window, 2L),
- KeyValue.pair("5@" + window, 2L)
+ new KeyValueTimestamp<>("1@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("2@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
+ new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
)
);
}
@@ -232,20 +233,16 @@ public class KStreamAggregationDedupIntegrationTest {
private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer,
- final List<KeyValue<K, V>> expectedRecords)
+ final List<KeyValueTimestamp<K, V>> expectedRecords)
throws InterruptedException {
final Properties consumerProperties = new Properties();
- consumerProperties
- .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
- consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
- testNo);
+ consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- keyDeserializer.getClass().getName());
- consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- valueDeserializer.getClass().getName());
+ consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+ consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived(
consumerProperties,
outputTopic,
expectedRecords);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 2f06af7..dcf72a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -98,8 +99,7 @@ public class KStreamAggregationIntegrationTest {
private static final int NUM_BROKERS = 1;
@ClassRule
- public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS);
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static volatile AtomicInteger testNo = new AtomicInteger(0);
private final MockTime mockTime = CLUSTER.time;
@@ -122,8 +122,7 @@ public class KStreamAggregationIntegrationTest {
streamsConfiguration = new Properties();
final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- streamsConfiguration
- .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -160,30 +159,35 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
- final List<KeyValue<String, String>> results = receiveMessages(
+ final List<KeyValueTimestamp<String, String>> results = receiveMessages(
new StringDeserializer(),
new StringDeserializer(),
10);
results.sort(KStreamAggregationIntegrationTest::compare);
- assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
- KeyValue.pair("A", "A:A"),
- KeyValue.pair("B", "B"),
- KeyValue.pair("B", "B:B"),
- KeyValue.pair("C", "C"),
- KeyValue.pair("C", "C:C"),
- KeyValue.pair("D", "D"),
- KeyValue.pair("D", "D:D"),
- KeyValue.pair("E", "E"),
- KeyValue.pair("E", "E:E"))));
+ assertThat(results, is(Arrays.asList(
+ new KeyValueTimestamp("A", "A", mockTime.milliseconds()),
+ new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()),
+ new KeyValueTimestamp("B", "B", mockTime.milliseconds()),
+ new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()),
+ new KeyValueTimestamp("C", "C", mockTime.milliseconds()),
+ new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()),
+ new KeyValueTimestamp("D", "D", mockTime.milliseconds()),
+ new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()),
+ new KeyValueTimestamp("E", "E", mockTime.milliseconds()),
+ new KeyValueTimestamp("E", "E:E", mockTime.milliseconds()))));
}
- private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
- final KeyValue<K, V> o2) {
- final int keyComparison = o1.key.compareTo(o2.key);
+ private static <K extends Comparable, V extends Comparable> int compare(final KeyValueTimestamp<K, V> o1,
+ final KeyValueTimestamp<K, V> o2) {
+ final int keyComparison = o1.key().compareTo(o2.key());
if (keyComparison == 0) {
- return o1.value.compareTo(o2.value);
+ final int valueComparison = o1.value().compareTo(o2.value());
+ if (valueComparison == 0) {
+ return Long.compare(o1.timestamp(), o2.timestamp());
+ }
+ return valueComparison;
}
return keyComparison;
}
@@ -206,7 +210,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
- final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
+ final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput = receiveMessages(
new TimeWindowedDeserializer<>(),
new StringDeserializer(),
String.class,
@@ -218,44 +222,45 @@ public class KStreamAggregationIntegrationTest {
new StringDeserializer(),
String.class,
15,
- false);
+ true);
- final Comparator<KeyValue<Windowed<String>, String>>
- comparator =
- Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
+ final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+ .thenComparing(KeyValueTimestamp::value);
windowedOutput.sort(comparator);
final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
- final List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
- new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
- new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
- new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
+ final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E", firstBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E", secondBatchTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E", secondBatchTimestamp)
);
assertThat(windowedOutput, is(expectResult));
final Set<String> expectResultString = new HashSet<>(expectResult.size());
- for (final KeyValue<Windowed<String>, String> eachRecord: expectResult) {
- expectResultString.add(eachRecord.toString());
+ for (final KeyValueTimestamp<Windowed<String>, String> eachRecord: expectResult) {
+ expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", "
+ + eachRecord.key() + ", " + eachRecord.value());
}
// check every message is contained in the expect result
final String[] allRecords = resultFromConsoleConsumer.split("\n");
for (final String record: allRecords) {
- assertTrue(expectResultString.contains("KeyValue(" + record + ")"));
+ assertTrue(expectResultString.contains(record));
}
}
@@ -273,7 +278,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
- final List<KeyValue<String, Integer>> results = receiveMessages(
+ final List<KeyValueTimestamp<String, Integer>> results = receiveMessages(
new StringDeserializer(),
new IntegerDeserializer(),
10);
@@ -281,16 +286,16 @@ public class KStreamAggregationIntegrationTest {
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
- KeyValue.pair("A", 1),
- KeyValue.pair("A", 2),
- KeyValue.pair("B", 1),
- KeyValue.pair("B", 2),
- KeyValue.pair("C", 1),
- KeyValue.pair("C", 2),
- KeyValue.pair("D", 1),
- KeyValue.pair("D", 2),
- KeyValue.pair("E", 1),
- KeyValue.pair("E", 2)
+ new KeyValueTimestamp("A", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp("A", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp("B", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp("B", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp("C", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp("C", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp("D", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp("D", 2, mockTime.milliseconds()),
+ new KeyValueTimestamp("E", 1, mockTime.milliseconds()),
+ new KeyValueTimestamp("E", 2, mockTime.milliseconds())
)));
}
@@ -315,7 +320,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
- final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> windowedMessages = receiveMessagesWithTimestamp(
+ final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
new TimeWindowedDeserializer<>(),
new IntegerDeserializer(),
String.class,
@@ -329,37 +334,36 @@ public class KStreamAggregationIntegrationTest {
15,
true);
- final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
- comparator =
- Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
-
+ final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> comparator =
+ Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> o) -> o.key().key())
+ .thenComparingInt(KeyValueTimestamp::value);
windowedMessages.sort(comparator);
final long firstWindow = firstTimestamp / 500 * 500;
final long secondWindow = secondTimestamp / 500 * 500;
- final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> expectResult = Arrays.asList(
- new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
- new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
- new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
- new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
- new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
- new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
- new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
+ final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+ new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp));
assertThat(windowedMessages, is(expectResult));
final Set<String> expectResultString = new HashSet<>(expectResult.size());
- for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>> eachRecord: expectResult) {
- expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString() + ", " + eachRecord.value.key);
+ for (final KeyValueTimestamp<Windowed<String>, Integer> eachRecord: expectResult) {
+ expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
}
// check every message is contained in the expect result
@@ -375,23 +379,23 @@ public class KStreamAggregationIntegrationTest {
produceMessages(mockTime.milliseconds());
- final List<KeyValue<String, Long>> results = receiveMessages(
+ final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
10);
results.sort(KStreamAggregationIntegrationTest::compare);
assertThat(results, is(Arrays.asList(
- KeyValue.pair("A", 1L),
- KeyValue.pair("A", 2L),
- KeyValue.pair("B", 1L),
- KeyValue.pair("B", 2L),
- KeyValue.pair("C", 1L),
- KeyValue.pair("C", 2L),
- KeyValue.pair("D", 1L),
- KeyValue.pair("D", 2L),
- KeyValue.pair("E", 1L),
- KeyValue.pair("E", 2L)
+ new KeyValueTimestamp("A", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp("A", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp("B", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp("B", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp("C", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp("C", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp("D", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp("D", 2L, mockTime.milliseconds()),
+ new KeyValueTimestamp("E", 1L, mockTime.milliseconds()),
+ new KeyValueTimestamp("E", 2L, mockTime.milliseconds())
)));
}
@@ -430,7 +434,7 @@ public class KStreamAggregationIntegrationTest {
startStreams();
- final List<KeyValue<String, Long>> results = receiveMessages(
+ final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
new StringDeserializer(),
new LongDeserializer(),
10);
@@ -438,18 +442,17 @@ public class KStreamAggregationIntegrationTest {
final long window = timestamp / 500 * 500;
assertThat(results, is(Arrays.asList(
- KeyValue.pair("1@" + window, 1L),
- KeyValue.pair("1@" + window, 2L),
- KeyValue.pair("2@" + window, 1L),
- KeyValue.pair("2@" + window, 2L),
- KeyValue.pair("3@" + window, 1L),
- KeyValue.pair("3@" + window, 2L),
- KeyValue.pair("4@" + window, 1L),
- KeyValue.pair("4@" + window, 2L),
- KeyValue.pair("5@" + window, 1L),
- KeyValue.pair("5@" + window, 2L)
+ new KeyValueTimestamp("1@" + window, 1L, timestamp),
+ new KeyValueTimestamp("1@" + window, 2L, timestamp),
+ new KeyValueTimestamp("2@" + window, 1L, timestamp),
+ new KeyValueTimestamp("2@" + window, 2L, timestamp),
+ new KeyValueTimestamp("3@" + window, 1L, timestamp),
+ new KeyValueTimestamp("3@" + window, 2L, timestamp),
+ new KeyValueTimestamp("4@" + window, 1L, timestamp),
+ new KeyValueTimestamp("4@" + window, 2L, timestamp),
+ new KeyValueTimestamp("5@" + window, 1L, timestamp),
+ new KeyValueTimestamp("5@" + window, 2L, timestamp)
)));
-
}
@Test
@@ -796,17 +799,17 @@ public class KStreamAggregationIntegrationTest {
kafkaStreams.start();
}
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer,
- final int numMessages)
+ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final int numMessages)
throws InterruptedException {
return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
}
- private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer,
- final Class innerClass,
- final int numMessages) throws InterruptedException {
+ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final Class innerClass,
+ final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
@@ -817,17 +820,17 @@ public class KStreamAggregationIntegrationTest {
consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
Serdes.serdeFrom(innerClass).getClass().getName());
}
- return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
consumerProperties,
outputTopic,
numMessages,
60 * 1000);
}
- private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
- final Deserializer<V> valueDeserializer,
- final Class innerClass,
- final int numMessages) throws InterruptedException {
+ private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
+ final Deserializer<V> valueDeserializer,
+ final Class innerClass,
+ final int numMessages) throws InterruptedException {
final Properties consumerProperties = new Properties();
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 646185e..4be14c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.integration;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -62,22 +63,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
- final List<List<String>> expectedResult = Arrays.asList(
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
null,
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -89,27 +104,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testInnerRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
- null,
- null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
- null,
- null,
- null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
- leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
- .join(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
- .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+ leftStream.map(MockMapper.noOpKeyValueMapper())
+ .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+ .selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@@ -119,22 +148,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeft() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
- final List<List<String>> expectedResult = Arrays.asList(
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -146,27 +189,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeftRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
- null,
- null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
- null,
- null,
- null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
- leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
- .leftJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
- .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+ leftStream.map(MockMapper.noOpKeyValueMapper())
+ .leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+ .selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@@ -176,22 +233,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testOuter() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
- final List<List<String>> expectedResult = Arrays.asList(
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
null,
null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
null,
null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
null,
null,
null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -203,27 +274,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testOuterRepartitioned() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Arrays.asList("A-b", "B-b"),
- null,
- null,
- Arrays.asList("C-a", "C-b"),
- Arrays.asList("A-c", "B-c", "C-c"),
- null,
- null,
- null,
- Arrays.asList("A-d", "B-d", "C-d"),
- Arrays.asList("D-a", "D-b", "D-c", "D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
- leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
- .outerJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
- .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+ leftStream.map(MockMapper.noOpKeyValueMapper())
+ .outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+ .selectKey(MockMapper.selectKeyKeyValueMapper()),
valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
runTest(expectedResult);
@@ -233,26 +318,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testMultiInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Collections.singletonList("A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"),
- null,
- null,
- Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"),
- Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c",
- "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"),
- null,
- null,
- null,
- Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c",
- "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d",
- "A-d-d", "B-d-d", "C-d-d"),
- Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a",
- "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10)))
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index ca78d02..772c91d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.streams.KafkaStreamsWrapper;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -34,8 +35,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import static org.junit.Assert.assertEquals;
-
+import static org.junit.Assert.assertTrue;
/**
* Tests all available joins of Kafka Streams DSL.
@@ -82,30 +82,30 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
streams.close();
- assertEquals(listener.createdToRevokedSeen(), true);
- assertEquals(listener.revokedToPendingShutdownSeen(), true);
+ assertTrue(listener.createdToRevokedSeen());
+ assertTrue(listener.revokedToPendingShutdownSeen());
}
@Test
public void testInner() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- null,
- Collections.singletonList("B-a"),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- Collections.singletonList("D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
@@ -117,22 +117,22 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
public void testLeft() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- Collections.singletonList("A-null"),
- null,
- Collections.singletonList("B-a"),
- null,
- null,
- null,
- Collections.singletonList("C-null"),
- null,
- null,
- null,
- null,
- null,
- Collections.singletonList("D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 73d1e3d..2b685f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
@@ -59,8 +60,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
}
- final private String expectedFinalJoinResult = "D-d";
- final private String expectedFinalMultiJoinResult = "D-d-d";
+ final private KeyValueTimestamp<Long, String> expectedFinalJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L);
+ final private KeyValueTimestamp<Long, String> expectedFinalMultiJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L);
final private String storeName = appID + "-store";
private Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
@@ -70,7 +71,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.withLoggingDisabled();
final private class CountingPeek implements ForeachAction<Long, String> {
- final private String expected;
+ final private KeyValueTimestamp<Long, String> expected;
CountingPeek(final boolean multiJoin) {
this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult;
@@ -79,7 +80,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
@Override
public void apply(final Long key, final String value) {
numRecordsExpected++;
- if (expected.equals(value)) {
+ if (expected.value().equals(value)) {
final boolean ret = finalResultReached.compareAndSet(false, true);
if (!ret) {
@@ -98,22 +99,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Collections.singletonList("B-b"),
- Collections.singletonList((String) null),
- null,
- null,
- Collections.singletonList("C-c"),
- Collections.singletonList((String) null),
- null,
- null,
- null,
- Collections.singletonList("D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -129,22 +130,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Collections.singletonList("B-b"),
- Collections.singletonList((String) null),
- null,
- Collections.singletonList("C-null"),
- Collections.singletonList("C-c"),
- Collections.singletonList("C-null"),
- Collections.singletonList((String) null),
- null,
- null,
- Collections.singletonList("D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -160,22 +161,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
runTest(expectedFinalJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- Collections.singletonList("A-null"),
- Collections.singletonList("A-a"),
- Collections.singletonList("B-a"),
- Collections.singletonList("B-b"),
- Collections.singletonList("null-b"),
- Collections.singletonList((String) null),
- Collections.singletonList("C-null"),
- Collections.singletonList("C-c"),
- Collections.singletonList("C-null"),
- Collections.singletonList((String) null),
- null,
- Collections.singletonList("null-d"),
- Collections.singletonList("D-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -197,22 +198,29 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
} else {
// FIXME: the duplicate below for all the multi-joins
// are due to KAFKA-6443, should be updated once it is fixed.
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("C-c-c", "C-c-c"),
- null,
- null,
- null,
- null,
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ null, // correct would be -> new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)
+ // we don't get correct value, because of self-join of `rightTable`
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@@ -235,22 +243,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("C-c-c", "C-c-c"),
- Collections.singletonList((String) null),
- null,
- null,
- null,
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@@ -274,22 +288,33 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList("null-b"),
- Collections.singletonList((String) null),
- null,
- Arrays.asList("C-c-c", "C-c-c"),
- Arrays.asList((String) null, null),
- null,
- null,
- null,
- Arrays.asList("null-d", "D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+ null,
+ null,
+ null,
+ Arrays.asList(
+ // incorrect result `null-d` is caused by self-join of `rightTable`
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.join(rightTable, valueJoiner)
@@ -312,22 +337,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("C-c-c", "C-c-c"),
- Collections.singletonList((String) null),
- null,
- null,
- null,
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@@ -351,22 +382,32 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
- Arrays.asList("C-null-null", "C-null-null"),
- Collections.singletonList((String) null),
- null,
- null,
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ null,
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@@ -390,22 +431,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList("null-b"),
- Collections.singletonList((String) null),
- null,
- Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
- Arrays.asList("C-null-null", "C-null-null"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("null-d", "D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.leftJoin(rightTable, valueJoiner)
@@ -428,22 +481,30 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList("null-b-b"),
- null,
- null,
- Arrays.asList("C-c-c", "C-c-c"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("null-d-d", "null-d-d"),
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)
@@ -467,22 +528,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList("null-b-b"),
- Collections.singletonList((String) null),
- null,
- Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
- Arrays.asList("C-null-null", "C-null-null"),
- Collections.singletonList((String) null),
- null,
- Arrays.asList("null-d-d", "null-d-d"),
- Collections.singletonList("D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)
@@ -506,22 +579,36 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
.to(OUTPUT_TOPIC);
runTest(expectedFinalMultiJoinResult, storeName);
} else {
- final List<List<String>> expectedResult = Arrays.asList(
- null,
- null,
- null,
- Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
- Collections.singletonList("B-a-a"),
- Arrays.asList("B-b-b", "B-b-b"),
- Collections.singletonList("null-b-b"),
- Arrays.asList((String) null, null),
- null,
- Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
- Arrays.asList("C-null-null", "C-null-null"),
- Collections.singletonList((String) null),
- null,
- null,
- Arrays.asList("null-d-d", "null-d-d", "D-d-d")
+ final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+ null,
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+ Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+ null,
+ null,
+ Arrays.asList(
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+ new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
);
leftTable.outerJoin(rightTable, valueJoiner)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e6cf85d..46515aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -241,12 +241,14 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
+ @SuppressWarnings("WeakerAccess")
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
final Long timestamp,
final boolean enableTransactions)
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException {
+
produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
}
@@ -260,13 +262,15 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
+ @SuppressWarnings("WeakerAccess")
public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
final Collection<KeyValue<K, V>> records,
final Properties producerConfig,
final Headers headers,
final Long timestamp,
final boolean enableTransactions)
- throws ExecutionException, InterruptedException {
+ throws ExecutionException, InterruptedException {
+
try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
if (enableTransactions) {
producer.initTransactions();
@@ -369,6 +373,7 @@ public class IntegrationTestUtils {
* @param enableTransactions Send messages in a transaction
* @param <V> Value type of the data records
*/
+ @SuppressWarnings("WeakerAccess")
public static <V> void produceValuesSynchronously(final String topic,
final Collection<V> records,
final Properties producerConfig,
@@ -427,6 +432,7 @@ public class IntegrationTestUtils {
* @param <V> Value type of the data records
* @return All the records consumed, or null if no records are consumed
*/
+ @SuppressWarnings("WeakerAccess")
public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords) throws InterruptedException {
@@ -444,6 +450,7 @@ public class IntegrationTestUtils {
* @param <V> Value type of the data records
* @return All the records consumed, or null if no records are consumed
*/
+ @SuppressWarnings("WeakerAccess")
public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
@@ -519,14 +526,14 @@ public class IntegrationTestUtils {
* @param <K> Key type of the data records
* @param <V> Value type of the data records
*/
- public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
+ public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
final String topic,
final int expectedNumRecords,
final long waitTime) throws InterruptedException {
- final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+ final List<KeyValueTimestamp<K, V>> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
- final List<KeyValue<K, KeyValue<V, Long>>> readData =
+ final List<KeyValueTimestamp<K, V>> readData =
readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
accumData.addAll(readData);
return accumData.size() >= expectedNumRecords;
@@ -559,33 +566,74 @@ public class IntegrationTestUtils {
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
* @param expectedRecords Expected key-value mappings
+ * @param <K> Key type of the data records
+ * @param <V> Value type of the data records
+ * @return All the mappings consumed, or null if no records are consumed
+ */
+ public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final List<KeyValueTimestamp<K, V>> expectedRecords) throws InterruptedException {
+ return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT, true);
+ }
+
+ /**
+ * Wait until final key-value mappings have been consumed.
+ *
+ * @param consumerConfig Kafka Consumer configuration
+ * @param topic Kafka topic to consume from
+ * @param expectedRecords Expected key-value mappings
* @param waitTime Upper bound of waiting time in milliseconds
* @param <K> Key type of the data records
* @param <V> Value type of the data records
* @return All the mappings consumed, or null if no records are consumed
*/
+ @SuppressWarnings("WeakerAccess")
public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
final String topic,
final List<KeyValue<K, V>> expectedRecords,
final long waitTime) throws InterruptedException {
- final List<KeyValue<K, V>> accumData = new ArrayList<>();
+ return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
+ }
+
+ public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final List<KeyValueTimestamp<K, V>> expectedRecords,
+ final long waitTime) throws InterruptedException {
+ return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+ final String topic,
+ final List<T> expectedRecords,
+ final long waitTime,
+ final boolean withTimestamp) throws InterruptedException {
+ final List<T> accumData = new ArrayList<>();
try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
final TestCondition valuesRead = () -> {
- final List<KeyValue<K, V>> readData =
- readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+ final List<T> readData;
+ if (withTimestamp) {
+ readData = (List<T>) readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedRecords.size());
+ } else {
+ readData = (List<T>) readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+ }
accumData.addAll(readData);
// filter out all intermediate records we don't want
- final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
+ final List<T> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
// still need to check that for each key, the ordering is expected
- final Map<K, List<KeyValue<K, V>>> finalAccumData = new HashMap<>();
- for (final KeyValue<K, V> kv : accumulatedActual) {
- finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
+ final Map<K, List<T>> finalAccumData = new HashMap<>();
+ for (final T kv : accumulatedActual) {
+ finalAccumData.computeIfAbsent(
+ (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
+ key -> new ArrayList<>()).add(kv);
}
- final Map<K, List<KeyValue<K, V>>> finalExpected = new HashMap<>();
- for (final KeyValue<K, V> kv : expectedRecords) {
- finalExpected.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
+ final Map<K, List<T>> finalExpected = new HashMap<>();
+ for (final T kv : expectedRecords) {
+ finalExpected.computeIfAbsent(
+ (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
+ key -> new ArrayList<>()).add(kv);
}
// returns true only if the remaining records in both lists are the same and in the same order
@@ -643,6 +691,7 @@ public class IntegrationTestUtils {
return accumData;
}
+ @SuppressWarnings("WeakerAccess")
public static void waitForTopicPartitions(final List<KafkaServer> servers,
final List<TopicPartition> partitions,
final long timeout) throws InterruptedException {
@@ -863,14 +912,14 @@ public class IntegrationTestUtils {
* @param maxMessages Maximum number of messages to read via the consumer
* @return The KeyValue elements retrieved via the consumer
*/
- private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final String topic,
- final Consumer<K, V> consumer,
- final long waitTime,
- final int maxMessages) {
- final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new ArrayList<>();
+ private static <K, V> List<KeyValueTimestamp<K, V>> readKeyValuesWithTimestamp(final String topic,
+ final Consumer<K, V> consumer,
+ final long waitTime,
+ final int maxMessages) {
+ final List<KeyValueTimestamp<K, V>> consumedValues = new ArrayList<>();
final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
for (final ConsumerRecord<K, V> record : records) {
- consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(), record.timestamp())));
+ consumedValues.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp()));
}
return consumedValues;
}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 0bd22d4..a6959e1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -56,7 +56,11 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
return capturedProcessors(1).get(0);
}
- // get the captured processors with the expected number
+ public int capturedProcessorsCount() {
+ return processors.size();
+ }
+
+ // get the captured processors with the expected number
public List<MockProcessor<K, V>> capturedProcessors(final int expectedNumberOfProcessors) {
assertEquals(expectedNumberOfProcessors, processors.size());