You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/30 16:56:48 UTC

[kafka] branch 2.3 updated: KAFKA-6455: Update integration tests to verify result timestamps (#6751)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 64aa82f  KAFKA-6455: Update integration tests to verify result timestamps (#6751)
64aa82f is described below

commit 64aa82f774b91260f5d86fd4377a8416d1ab034d
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu May 30 09:46:12 2019 -0700

    KAFKA-6455: Update integration tests to verify result timestamps (#6751)
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, John Roesler <jo...@confluent.io>
---
 .../streams/kstream/internals/KTableReduce.java    |   3 +-
 .../integration/AbstractJoinIntegrationTest.java   |  68 +--
 .../integration/AbstractResetIntegrationTest.java  |  36 +-
 .../FineGrainedAutoResetIntegrationTest.java       |   7 +-
 .../integration/GlobalKTableIntegrationTest.java   | 113 +++--
 .../KStreamAggregationDedupIntegrationTest.java    |  63 ++-
 .../KStreamAggregationIntegrationTest.java         | 227 +++++-----
 .../StreamStreamJoinIntegrationTest.java           | 349 ++++++++++-----
 .../StreamTableJoinIntegrationTest.java            |  72 ++--
 .../integration/TableTableJoinIntegrationTest.java | 479 ++++++++++++---------
 .../integration/utils/IntegrationTestUtils.java    |  91 +++-
 .../apache/kafka/test/MockProcessorSupplier.java   |   6 +-
 12 files changed, 922 insertions(+), 592 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
index 1b1a2bf..171912f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -80,7 +80,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
             final ValueAndTimestamp<V> oldAggAndTimestamp = store.get(key);
             final V oldAgg = getValueOrNull(oldAggAndTimestamp);
             final V intermediateAgg;
-            long newTimestamp = context().timestamp();
+            long newTimestamp;
 
             // first try to remove the old value
             if (value.oldValue != null && oldAgg != null) {
@@ -88,6 +88,7 @@ public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
                 newTimestamp = Math.max(context().timestamp(), oldAggAndTimestamp.timestamp());
             } else {
                 intermediateAgg = oldAgg;
+                newTimestamp = context().timestamp();
             }
 
             // then try to add the new value
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 7fa108d..daea1d7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -51,6 +53,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -86,7 +89,7 @@ public abstract class AbstractJoinIntegrationTest {
     static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
     static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
     static final String OUTPUT_TOPIC = "outputTopic";
-    private final long anyUniqueKey = 0L;
+    static final long ANY_UNIQUE_KEY = 0L;
 
     private final static Properties PRODUCER_CONFIG = new Properties();
     private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
@@ -163,13 +166,13 @@ public abstract class AbstractJoinIntegrationTest {
         CLUSTER.deleteAllTopicsAndWait(120000);
     }
 
-    private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
-        final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
-        assertThat(result, is(expectedResult));
+    private void checkResult(final String outputTopic, final List<KeyValueTimestamp<Long, String>> expectedResult) throws InterruptedException {
+        IntegrationTestUtils.verifyKeyValueTimestamps(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult);
     }
 
-    private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
-        final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
+    private void checkResult(final String outputTopic, final KeyValueTimestamp<Long, String> expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
+        final List<KeyValueTimestamp<Long, String>> result =
+            IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
         assertThat(result.get(result.size() - 1), is(expectedFinalResult));
     }
 
@@ -177,7 +180,7 @@ public abstract class AbstractJoinIntegrationTest {
      * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
      * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
      */
-    void runTest(final List<List<String>> expectedResult) throws Exception {
+    void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult) throws Exception {
         runTest(expectedResult, null);
     }
 
@@ -186,28 +189,34 @@ public abstract class AbstractJoinIntegrationTest {
      * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
      * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
      */
-    void runTest(final List<List<String>> expectedResult, final String storeName) throws Exception {
+    void runTest(final List<List<KeyValueTimestamp<Long, String>>> expectedResult, final String storeName) throws Exception {
         assert expectedResult.size() == input.size();
 
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
         streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
-        String expectedFinalResult = null;
+        KeyValueTimestamp<Long, String> expectedFinalResult = null;
 
         try {
             streams.start();
 
-            long ts = System.currentTimeMillis();
+            final long firstTimestamp = System.currentTimeMillis();
+            long ts = firstTimestamp;
 
-            final Iterator<List<String>> resultIterator = expectedResult.iterator();
+            final Iterator<List<KeyValueTimestamp<Long, String>>> resultIterator = expectedResult.iterator();
             for (final Input<String> singleInput : input) {
                 producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
 
-                final List<String> expected = resultIterator.next();
+                final List<KeyValueTimestamp<Long, String>> expected = resultIterator.next();
 
                 if (expected != null) {
-                    checkResult(OUTPUT_TOPIC, expected);
-                    expectedFinalResult = expected.get(expected.size() - 1);
+                    final List<KeyValueTimestamp<Long, String>> updatedExpected = new LinkedList<>();
+                    for (final KeyValueTimestamp<Long, String> record : expected) {
+                        updatedExpected.add(new KeyValueTimestamp<>(record.key(), record.value(), firstTimestamp + record.timestamp()));
+                    }
+
+                    checkResult(OUTPUT_TOPIC, updatedExpected);
+                    expectedFinalResult = updatedExpected.get(expected.size() - 1);
                 }
             }
 
@@ -222,21 +231,22 @@ public abstract class AbstractJoinIntegrationTest {
     /*
      * Runs the actual test. Checks the final result only after expected number of records have been consumed.
      */
-    void runTest(final String expectedFinalResult) throws Exception {
+    void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult) throws Exception {
         runTest(expectedFinalResult, null);
     }
 
     /*
      * Runs the actual test. Checks the final result only after expected number of records have been consumed.
      */
-    void runTest(final String expectedFinalResult, final String storeName) throws Exception {
+    void runTest(final KeyValueTimestamp<Long, String> expectedFinalResult, final String storeName) throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
         streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
 
         try {
             streams.start();
 
-            long ts = System.currentTimeMillis();
+            final long firstTimestamp = System.currentTimeMillis();
+            long ts = firstTimestamp;
 
             for (final Input<String> singleInput : input) {
                 producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
@@ -244,10 +254,15 @@ public abstract class AbstractJoinIntegrationTest {
 
             TestUtils.waitForCondition(() -> finalResultReached.get(), "Never received expected final result.");
 
-            checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
+            final KeyValueTimestamp<Long, String> updatedExpectedFinalResult =
+                new KeyValueTimestamp<>(
+                    expectedFinalResult.key(),
+                    expectedFinalResult.value(),
+                    firstTimestamp + expectedFinalResult.timestamp());
+            checkResult(OUTPUT_TOPIC, updatedExpectedFinalResult, numRecordsExpected);
 
             if (storeName != null) {
-                checkQueryableStore(storeName, expectedFinalResult);
+                checkQueryableStore(storeName, updatedExpectedFinalResult);
             }
         } finally {
             streams.close();
@@ -257,15 +272,16 @@ public abstract class AbstractJoinIntegrationTest {
     /*
      * Checks the embedded queryable state store snapshot
      */
-    private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
-        final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.keyValueStore());
+    private void checkQueryableStore(final String queryableName, final KeyValueTimestamp<Long, String> expectedFinalResult) {
+        final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = streams.store(queryableName, QueryableStoreTypes.timestampedKeyValueStore());
 
-        final KeyValueIterator<Long, String> all = store.all();
-        final KeyValue<Long, String> onlyEntry = all.next();
+        final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all();
+        final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
 
         try {
-            assertThat(onlyEntry.key, is(anyUniqueKey));
-            assertThat(onlyEntry.value, is(expectedFinalResult));
+            assertThat(onlyEntry.key, is(expectedFinalResult.key()));
+            assertThat(onlyEntry.value.value(), is(expectedFinalResult.value()));
+            assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp()));
             assertThat(all.hasNext(), is(false));
         } finally {
             all.close();
@@ -278,7 +294,7 @@ public abstract class AbstractJoinIntegrationTest {
 
         Input(final String topic, final V value) {
             this.topic = topic;
-            record = KeyValue.pair(anyUniqueKey, value);
+            record = KeyValue.pair(ANY_UNIQUE_KEY, value);
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index c9ae1bb..000f299 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
-import java.time.Duration;
+import kafka.tools.StreamsResetter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
@@ -39,10 +39,8 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -56,6 +54,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -65,8 +64,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import kafka.tools.StreamsResetter;
-
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -223,7 +220,7 @@ public abstract class AbstractResetIntegrationTest {
         }
     }
 
-    void shouldNotAllowToResetWhileStreamsIsRunning() throws Exception {
+    void shouldNotAllowToResetWhileStreamsIsRunning() {
         appID = testId + "-not-reset-during-runtime";
         final String[] parameters = new String[] {
             "--application-id", appID,
@@ -390,7 +387,6 @@ public abstract class AbstractResetIntegrationTest {
         final File resetFile = File.createTempFile("reset", ".csv");
         try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
             writer.write(INPUT_TOPIC + ",0,1");
-            writer.close();
         }
 
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -434,7 +430,6 @@ public abstract class AbstractResetIntegrationTest {
         final File resetFile = File.createTempFile("reset", ".csv");
         try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
             writer.write(INPUT_TOPIC + ",0,1");
-            writer.close();
         }
 
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -482,7 +477,6 @@ public abstract class AbstractResetIntegrationTest {
         final File resetFile = File.createTempFile("reset", ".csv");
         try (final BufferedWriter writer = new BufferedWriter(new FileWriter(resetFile))) {
             writer.write(INPUT_TOPIC + ",0,1");
-            writer.close();
         }
 
         streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
@@ -514,12 +508,7 @@ public abstract class AbstractResetIntegrationTest {
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
         // use map to trigger internal re-partitioning before groupByKey
-        input.map(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
-            @Override
-            public KeyValue<Long, String> apply(final Long key, final String value) {
-                return new KeyValue<>(key, value);
-            }
-        })
+        input.map(KeyValue::new)
             .groupByKey()
             .count()
             .toStream()
@@ -530,12 +519,7 @@ public abstract class AbstractResetIntegrationTest {
             .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
             .count()
             .toStream()
-            .map(new KeyValueMapper<Windowed<Long>, Long, KeyValue<Long, Long>>() {
-                @Override
-                public KeyValue<Long, Long> apply(final Windowed<Long> key, final Long value) {
-                    return new KeyValue<>(key.window().start() + key.window().end(), value);
-                }
-            })
+            .map((key, value) -> new KeyValue<>(key.window().start() + key.window().end(), value))
             .to(outputTopic2, Produced.with(Serdes.Long(), Serdes.Long()));
 
         return builder.build();
@@ -547,12 +531,8 @@ public abstract class AbstractResetIntegrationTest {
         final KStream<Long, String> input = builder.stream(INPUT_TOPIC);
 
         // use map to trigger internal re-partitioning before groupByKey
-        input.map(new KeyValueMapper<Long, String, KeyValue<Long, Long>>() {
-            @Override
-            public KeyValue<Long, Long> apply(final Long key, final String value) {
-                return new KeyValue<>(key, key);
-            }
-        }).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
+        input.map((key, value) -> new KeyValue<>(key, key))
+            .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
         return builder.build();
     }
@@ -590,7 +570,7 @@ public abstract class AbstractResetIntegrationTest {
             parameterList.add(resetScenarioArg);
         }
 
-        final String[] parameters = parameterList.toArray(new String[parameterList.size()]);
+        final String[] parameters = parameterList.toArray(new String[0]);
 
         final Properties cleanUpConfig = new Properties();
         cleanUpConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
index 87d6a16..45ea71c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FineGrainedAutoResetIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 
+import kafka.utils.MockTime;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -55,8 +56,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import kafka.utils.MockTime;
-
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -191,8 +190,8 @@ public class FineGrainedAutoResetIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
 
-        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.EARLIEST));
-        final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.LATEST));
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
+        final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.with(Topology.AutoOffsetReset.LATEST));
         final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ));
 
         pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde));
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index d3e0d24..0a9148d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -39,7 +38,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -68,7 +69,6 @@ public class GlobalKTableIntegrationTest {
     private final KeyValueMapper<String, Long, Long> keyMapper = (key, value) -> value;
     private final ValueJoiner<Long, String, String> joiner = (value1, value2) -> value1 + "+" + value2;
     private final String globalStore = "globalStore";
-    private final Map<String, String> results = new HashMap<>();
     private StreamsBuilder builder;
     private Properties streamsConfiguration;
     private KafkaStreams kafkaStreams;
@@ -76,7 +76,7 @@ public class GlobalKTableIntegrationTest {
     private String streamTopic;
     private GlobalKTable<Long, String> globalTable;
     private KStream<String, Long> stream;
-    private ForeachAction<String, String> foreachAction;
+    private MockProcessorSupplier<String, String> supplier;
 
     @Before
     public void before() throws Exception {
@@ -96,7 +96,7 @@ public class GlobalKTableIntegrationTest {
                                                   .withValueSerde(Serdes.String()));
         final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
         stream = builder.stream(streamTopic, stringLongConsumed);
-        foreachAction = results::put;
+        supplier = new MockProcessorSupplier<>();
     }
 
     @After
@@ -110,24 +110,34 @@ public class GlobalKTableIntegrationTest {
     @Test
     public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
         final KStream<String, String> streamTableJoin = stream.leftJoin(globalTable, keyMapper, joiner);
-        streamTableJoin.foreach(foreachAction);
+        streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
         startStreams();
+        long firstTimestamp = mockTime.milliseconds();
         produceTopicValues(streamTopic);
 
-        final Map<String, String> expected = new HashMap<>();
-        expected.put("a", "1+A");
-        expected.put("b", "2+B");
-        expected.put("c", "3+C");
-        expected.put("d", "4+D");
-        expected.put("e", "5+null");
+        final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
+        expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
+        expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
+        expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
+        expected.put("e", ValueAndTimestamp.make("5+null", firstTimestamp + 4L));
 
         TestUtils.waitForCondition(
-            () -> results.equals(expected),
+            () -> {
+                if (supplier.capturedProcessorsCount() < 2) {
+                    return false;
+                }
+                final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+                result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+                result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+                return result.equals(expected);
+            },
             30000L,
             "waiting for initial values");
 
 
+        firstTimestamp = mockTime.milliseconds();
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@@ -138,16 +148,29 @@ public class GlobalKTableIntegrationTest {
             30000,
             "waiting for data in replicated store");
 
+        final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+        assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
+
+        firstTimestamp = mockTime.milliseconds();
         produceTopicValues(streamTopic);
 
-        expected.put("a", "1+F");
-        expected.put("b", "2+G");
-        expected.put("c", "3+H");
-        expected.put("d", "4+I");
-        expected.put("e", "5+J");
+        expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
+        expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
+        expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
+        expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
+        expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
 
         TestUtils.waitForCondition(
-            () -> results.equals(expected),
+            () -> {
+                if (supplier.capturedProcessorsCount() < 2) {
+                    return false;
+                }
+                final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+                result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+                result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+                return result.equals(expected);
+            },
             30000L,
             "waiting for final values");
     }
@@ -155,23 +178,33 @@ public class GlobalKTableIntegrationTest {
     @Test
     public void shouldKStreamGlobalKTableJoin() throws Exception {
         final KStream<String, String> streamTableJoin = stream.join(globalTable, keyMapper, joiner);
-        streamTableJoin.foreach(foreachAction);
+        streamTableJoin.process(supplier);
         produceInitialGlobalTableValues();
         startStreams();
+        long firstTimestamp = mockTime.milliseconds();
         produceTopicValues(streamTopic);
 
-        final Map<String, String> expected = new HashMap<>();
-        expected.put("a", "1+A");
-        expected.put("b", "2+B");
-        expected.put("c", "3+C");
-        expected.put("d", "4+D");
+        final Map<String, ValueAndTimestamp<String>> expected = new HashMap<>();
+        expected.put("a", ValueAndTimestamp.make("1+A", firstTimestamp));
+        expected.put("b", ValueAndTimestamp.make("2+B", firstTimestamp + 1L));
+        expected.put("c", ValueAndTimestamp.make("3+C", firstTimestamp + 2L));
+        expected.put("d", ValueAndTimestamp.make("4+D", firstTimestamp + 3L));
 
         TestUtils.waitForCondition(
-            () -> results.equals(expected),
+            () -> {
+                if (supplier.capturedProcessorsCount() < 2) {
+                    return false;
+                }
+                final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+                result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+                result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+                return result.equals(expected);
+            },
             30000L,
             "waiting for initial values");
 
 
+        firstTimestamp = mockTime.milliseconds();
         produceGlobalTableValues();
 
         final ReadOnlyKeyValueStore<Long, String> replicatedStore =
@@ -182,16 +215,29 @@ public class GlobalKTableIntegrationTest {
             30000,
             "waiting for data in replicated store");
 
+        final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+        assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));
+
+        firstTimestamp = mockTime.milliseconds();
         produceTopicValues(streamTopic);
 
-        expected.put("a", "1+F");
-        expected.put("b", "2+G");
-        expected.put("c", "3+H");
-        expected.put("d", "4+I");
-        expected.put("e", "5+J");
+        expected.put("a", ValueAndTimestamp.make("1+F", firstTimestamp));
+        expected.put("b", ValueAndTimestamp.make("2+G", firstTimestamp + 1L));
+        expected.put("c", ValueAndTimestamp.make("3+H", firstTimestamp + 2L));
+        expected.put("d", ValueAndTimestamp.make("4+I", firstTimestamp + 3L));
+        expected.put("e", ValueAndTimestamp.make("5+J", firstTimestamp + 4L));
 
         TestUtils.waitForCondition(
-            () -> results.equals(expected),
+            () -> {
+                if (supplier.capturedProcessorsCount() < 2) {
+                    return false;
+                }
+                final Map<String, ValueAndTimestamp<String>> result = new HashMap<>();
+                result.putAll(supplier.capturedProcessors(2).get(0).lastValueAndTimestampPerKey);
+                result.putAll(supplier.capturedProcessors(2).get(1).lastValueAndTimestampPerKey);
+                return result.equals(expected);
+            },
             30000L,
             "waiting for final values");
     }
@@ -209,11 +255,16 @@ public class GlobalKTableIntegrationTest {
         startStreams();
         ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
         assertThat(store.approximateNumEntries(), equalTo(4L));
+        ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore =
+            kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+        assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
         kafkaStreams.close();
 
         startStreams();
         store = kafkaStreams.store(globalStore, QueryableStoreTypes.keyValueStore());
         assertThat(store.approximateNumEntries(), equalTo(4L));
+        timestampedStore = kafkaStreams.store(globalStore, QueryableStoreTypes.timestampedKeyValueStore());
+        assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
     }
 
     private void createTopics() throws Exception {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 14346cf..61f6356 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -66,8 +67,7 @@ public class KStreamAggregationDedupIntegrationTest {
     private static final long COMMIT_INTERVAL_MS = 300L;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private final MockTime mockTime = CLUSTER.time;
     private static volatile AtomicInteger testNo = new AtomicInteger(0);
@@ -119,17 +119,18 @@ public class KStreamAggregationDedupIntegrationTest {
 
         startStreams();
 
-        produceMessages(System.currentTimeMillis());
+        final long timestamp = System.currentTimeMillis();
+        produceMessages(timestamp);
 
         validateReceivedMessages(
                 new StringDeserializer(),
                 new StringDeserializer(),
                 Arrays.asList(
-                        KeyValue.pair("A", "A:A"),
-                        KeyValue.pair("B", "B:B"),
-                        KeyValue.pair("C", "C:C"),
-                        KeyValue.pair("D", "D:D"),
-                        KeyValue.pair("E", "E:E")));
+                    new KeyValueTimestamp<>("A", "A:A", timestamp),
+                    new KeyValueTimestamp<>("B", "B:B", timestamp),
+                    new KeyValueTimestamp<>("C", "C:C", timestamp),
+                    new KeyValueTimestamp<>("D", "D:D", timestamp),
+                    new KeyValueTimestamp<>("E", "E:E", timestamp)));
     }
 
     @Test
@@ -155,16 +156,16 @@ public class KStreamAggregationDedupIntegrationTest {
                 new StringDeserializer(),
                 new StringDeserializer(),
                 Arrays.asList(
-                        new KeyValue<>("A@" + firstBatchWindow, "A"),
-                        new KeyValue<>("A@" + secondBatchWindow, "A:A"),
-                        new KeyValue<>("B@" + firstBatchWindow, "B"),
-                        new KeyValue<>("B@" + secondBatchWindow, "B:B"),
-                        new KeyValue<>("C@" + firstBatchWindow, "C"),
-                        new KeyValue<>("C@" + secondBatchWindow, "C:C"),
-                        new KeyValue<>("D@" + firstBatchWindow, "D"),
-                        new KeyValue<>("D@" + secondBatchWindow, "D:D"),
-                        new KeyValue<>("E@" + firstBatchWindow, "E"),
-                        new KeyValue<>("E@" + secondBatchWindow, "E:E")
+                    new KeyValueTimestamp<>("A@" + firstBatchWindow, "A", firstBatchTimestamp),
+                    new KeyValueTimestamp<>("A@" + secondBatchWindow, "A:A", secondBatchTimestamp),
+                    new KeyValueTimestamp<>("B@" + firstBatchWindow, "B", firstBatchTimestamp),
+                    new KeyValueTimestamp<>("B@" + secondBatchWindow, "B:B", secondBatchTimestamp),
+                    new KeyValueTimestamp<>("C@" + firstBatchWindow, "C", firstBatchTimestamp),
+                    new KeyValueTimestamp<>("C@" + secondBatchWindow, "C:C", secondBatchTimestamp),
+                    new KeyValueTimestamp<>("D@" + firstBatchWindow, "D", firstBatchTimestamp),
+                    new KeyValueTimestamp<>("D@" + secondBatchWindow, "D:D", secondBatchTimestamp),
+                    new KeyValueTimestamp<>("E@" + firstBatchWindow, "E", firstBatchTimestamp),
+                    new KeyValueTimestamp<>("E@" + secondBatchWindow, "E:E", secondBatchTimestamp)
                 )
         );
     }
@@ -189,11 +190,11 @@ public class KStreamAggregationDedupIntegrationTest {
                 new StringDeserializer(),
                 new LongDeserializer(),
                 Arrays.asList(
-                        KeyValue.pair("1@" + window, 2L),
-                        KeyValue.pair("2@" + window, 2L),
-                        KeyValue.pair("3@" + window, 2L),
-                        KeyValue.pair("4@" + window, 2L),
-                        KeyValue.pair("5@" + window, 2L)
+                    new KeyValueTimestamp<>("1@" + window, 2L, timestamp),
+                    new KeyValueTimestamp<>("2@" + window, 2L, timestamp),
+                    new KeyValueTimestamp<>("3@" + window, 2L, timestamp),
+                    new KeyValueTimestamp<>("4@" + window, 2L, timestamp),
+                    new KeyValueTimestamp<>("5@" + window, 2L, timestamp)
                 )
         );
     }
@@ -232,20 +233,16 @@ public class KStreamAggregationDedupIntegrationTest {
 
     private <K, V> void validateReceivedMessages(final Deserializer<K> keyDeserializer,
                                                  final Deserializer<V> valueDeserializer,
-                                                 final List<KeyValue<K, V>> expectedRecords)
+                                                 final List<KeyValueTimestamp<K, V>> expectedRecords)
         throws InterruptedException {
         final Properties consumerProperties = new Properties();
-        consumerProperties
-            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
-            testNo);
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
         consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            keyDeserializer.getClass().getName());
-        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            valueDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
 
-        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+        IntegrationTestUtils.waitUntilFinalKeyValueTimestampRecordsReceived(
             consumerProperties,
             outputTopic,
             expectedRecords);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 2f06af7..dcf72a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -98,8 +99,7 @@ public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
 
     @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER =
-        new EmbeddedKafkaCluster(NUM_BROKERS);
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static volatile AtomicInteger testNo = new AtomicInteger(0);
     private final MockTime mockTime = CLUSTER.time;
@@ -122,8 +122,7 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration = new Properties();
         final String applicationId = "kgrouped-stream-test-" + testNo.incrementAndGet();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
@@ -160,30 +159,35 @@ public class KStreamAggregationIntegrationTest {
 
         produceMessages(mockTime.milliseconds());
 
-        final List<KeyValue<String, String>> results = receiveMessages(
+        final List<KeyValueTimestamp<String, String>> results = receiveMessages(
             new StringDeserializer(),
             new StringDeserializer(),
             10);
 
         results.sort(KStreamAggregationIntegrationTest::compare);
 
-        assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
-            KeyValue.pair("A", "A:A"),
-            KeyValue.pair("B", "B"),
-            KeyValue.pair("B", "B:B"),
-            KeyValue.pair("C", "C"),
-            KeyValue.pair("C", "C:C"),
-            KeyValue.pair("D", "D"),
-            KeyValue.pair("D", "D:D"),
-            KeyValue.pair("E", "E"),
-            KeyValue.pair("E", "E:E"))));
+        assertThat(results, is(Arrays.asList(
+            new KeyValueTimestamp("A", "A", mockTime.milliseconds()),
+            new KeyValueTimestamp("A", "A:A", mockTime.milliseconds()),
+            new KeyValueTimestamp("B", "B", mockTime.milliseconds()),
+            new KeyValueTimestamp("B", "B:B", mockTime.milliseconds()),
+            new KeyValueTimestamp("C", "C", mockTime.milliseconds()),
+            new KeyValueTimestamp("C", "C:C", mockTime.milliseconds()),
+            new KeyValueTimestamp("D", "D", mockTime.milliseconds()),
+            new KeyValueTimestamp("D", "D:D", mockTime.milliseconds()),
+            new KeyValueTimestamp("E", "E", mockTime.milliseconds()),
+            new KeyValueTimestamp("E", "E:E", mockTime.milliseconds()))));
     }
 
-    private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
-                                                                            final KeyValue<K, V> o2) {
-        final int keyComparison = o1.key.compareTo(o2.key);
+    private static <K extends Comparable, V extends Comparable> int compare(final KeyValueTimestamp<K, V> o1,
+                                                                            final KeyValueTimestamp<K, V> o2) {
+        final int keyComparison = o1.key().compareTo(o2.key());
         if (keyComparison == 0) {
-            return o1.value.compareTo(o2.value);
+            final int valueComparison = o1.value().compareTo(o2.value());
+            if (valueComparison == 0) {
+                return Long.compare(o1.timestamp(), o2.timestamp());
+            }
+            return valueComparison;
         }
         return keyComparison;
     }
@@ -206,7 +210,7 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<Windowed<String>, String>> windowedOutput = receiveMessages(
+        final List<KeyValueTimestamp<Windowed<String>, String>> windowedOutput = receiveMessages(
             new TimeWindowedDeserializer<>(),
             new StringDeserializer(),
             String.class,
@@ -218,44 +222,45 @@ public class KStreamAggregationIntegrationTest {
             new StringDeserializer(),
             String.class,
             15,
-            false);
+            true);
 
-        final Comparator<KeyValue<Windowed<String>, String>>
-            comparator =
-            Comparator.comparing((KeyValue<Windowed<String>, String> o) -> o.key.key()).thenComparing(o -> o.value);
+        final Comparator<KeyValueTimestamp<Windowed<String>, String>> comparator =
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, String> o) -> o.key().key())
+                .thenComparing(KeyValueTimestamp::value);
 
         windowedOutput.sort(comparator);
         final long firstBatchWindow = firstBatchTimestamp / 500 * 500;
         final long secondBatchWindow = secondBatchTimestamp / 500 * 500;
 
-        final List<KeyValue<Windowed<String>, String>> expectResult = Arrays.asList(
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A"),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A"),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A"),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B"),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B"),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B"),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C"),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C"),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C"),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D"),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D"),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D"),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E"),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E"),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E")
+        final List<KeyValueTimestamp<Windowed<String>, String>> expectResult = Arrays.asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "A:A", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "B", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "B:B", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "C", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "C:C", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "D", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "D:D", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstBatchWindow, Long.MAX_VALUE)), "E", firstBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E", secondBatchTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondBatchWindow, Long.MAX_VALUE)), "E:E", secondBatchTimestamp)
         );
         assertThat(windowedOutput, is(expectResult));
 
         final Set<String> expectResultString = new HashSet<>(expectResult.size());
-        for (final KeyValue<Windowed<String>, String> eachRecord: expectResult) {
-            expectResultString.add(eachRecord.toString());
+        for (final KeyValueTimestamp<Windowed<String>, String> eachRecord: expectResult) {
+            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", "
+                + eachRecord.key() + ", " + eachRecord.value());
         }
 
         // check every message is contained in the expect result
         final String[] allRecords = resultFromConsoleConsumer.split("\n");
         for (final String record: allRecords) {
-            assertTrue(expectResultString.contains("KeyValue(" + record + ")"));
+            assertTrue(expectResultString.contains(record));
         }
     }
 
@@ -273,7 +278,7 @@ public class KStreamAggregationIntegrationTest {
 
         produceMessages(mockTime.milliseconds());
 
-        final List<KeyValue<String, Integer>> results = receiveMessages(
+        final List<KeyValueTimestamp<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
             new IntegerDeserializer(),
             10);
@@ -281,16 +286,16 @@ public class KStreamAggregationIntegrationTest {
         results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
-            KeyValue.pair("A", 1),
-            KeyValue.pair("A", 2),
-            KeyValue.pair("B", 1),
-            KeyValue.pair("B", 2),
-            KeyValue.pair("C", 1),
-            KeyValue.pair("C", 2),
-            KeyValue.pair("D", 1),
-            KeyValue.pair("D", 2),
-            KeyValue.pair("E", 1),
-            KeyValue.pair("E", 2)
+            new KeyValueTimestamp("A", 1, mockTime.milliseconds()),
+            new KeyValueTimestamp("A", 2, mockTime.milliseconds()),
+            new KeyValueTimestamp("B", 1, mockTime.milliseconds()),
+            new KeyValueTimestamp("B", 2, mockTime.milliseconds()),
+            new KeyValueTimestamp("C", 1, mockTime.milliseconds()),
+            new KeyValueTimestamp("C", 2, mockTime.milliseconds()),
+            new KeyValueTimestamp("D", 1, mockTime.milliseconds()),
+            new KeyValueTimestamp("D", 2, mockTime.milliseconds()),
+            new KeyValueTimestamp("E", 1, mockTime.milliseconds()),
+            new KeyValueTimestamp("E", 2, mockTime.milliseconds())
         )));
     }
 
@@ -315,7 +320,7 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> windowedMessages = receiveMessagesWithTimestamp(
+        final List<KeyValueTimestamp<Windowed<String>, Integer>> windowedMessages = receiveMessagesWithTimestamp(
             new TimeWindowedDeserializer<>(),
             new IntegerDeserializer(),
             String.class,
@@ -329,37 +334,36 @@ public class KStreamAggregationIntegrationTest {
             15,
             true);
 
-        final Comparator<KeyValue<Windowed<String>, KeyValue<Integer, Long>>>
-            comparator =
-            Comparator.comparing((KeyValue<Windowed<String>, KeyValue<Integer, Long>> o) -> o.key.key()).thenComparingInt(o -> o.value.key);
-
+        final Comparator<KeyValueTimestamp<Windowed<String>, Integer>> comparator =
+            Comparator.comparing((KeyValueTimestamp<Windowed<String>, Integer> o) -> o.key().key())
+                .thenComparingInt(KeyValueTimestamp::value);
         windowedMessages.sort(comparator);
 
         final long firstWindow = firstTimestamp / 500 * 500;
         final long secondWindow = secondTimestamp / 500 * 500;
 
-        final List<KeyValue<Windowed<String>, KeyValue<Integer, Long>>> expectResult = Arrays.asList(
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), KeyValue.pair(1, firstTimestamp)),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(1, secondTimestamp)),
-                new KeyValue<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), KeyValue.pair(2, secondTimestamp)));
+        final List<KeyValueTimestamp<Windowed<String>, Integer>> expectResult = Arrays.asList(
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("B", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("C", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("D", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(firstWindow, Long.MAX_VALUE)), 1, firstTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 1, secondTimestamp),
+                new KeyValueTimestamp<>(new Windowed<>("E", new TimeWindow(secondWindow, Long.MAX_VALUE)), 2, secondTimestamp));
 
         assertThat(windowedMessages, is(expectResult));
 
         final Set<String> expectResultString = new HashSet<>(expectResult.size());
-        for (final KeyValue<Windowed<String>, KeyValue<Integer, Long>> eachRecord: expectResult) {
-            expectResultString.add("CreateTime:" + eachRecord.value.value + ", " + eachRecord.key.toString() + ", " + eachRecord.value.key);
+        for (final KeyValueTimestamp<Windowed<String>, Integer> eachRecord: expectResult) {
+            expectResultString.add("CreateTime:" + eachRecord.timestamp() + ", " + eachRecord.key() + ", " + eachRecord.value());
         }
 
         // check every message is contained in the expect result
@@ -375,23 +379,23 @@ public class KStreamAggregationIntegrationTest {
 
         produceMessages(mockTime.milliseconds());
 
-        final List<KeyValue<String, Long>> results = receiveMessages(
+        final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
             10);
         results.sort(KStreamAggregationIntegrationTest::compare);
 
         assertThat(results, is(Arrays.asList(
-            KeyValue.pair("A", 1L),
-            KeyValue.pair("A", 2L),
-            KeyValue.pair("B", 1L),
-            KeyValue.pair("B", 2L),
-            KeyValue.pair("C", 1L),
-            KeyValue.pair("C", 2L),
-            KeyValue.pair("D", 1L),
-            KeyValue.pair("D", 2L),
-            KeyValue.pair("E", 1L),
-            KeyValue.pair("E", 2L)
+            new KeyValueTimestamp("A", 1L, mockTime.milliseconds()),
+            new KeyValueTimestamp("A", 2L, mockTime.milliseconds()),
+            new KeyValueTimestamp("B", 1L, mockTime.milliseconds()),
+            new KeyValueTimestamp("B", 2L, mockTime.milliseconds()),
+            new KeyValueTimestamp("C", 1L, mockTime.milliseconds()),
+            new KeyValueTimestamp("C", 2L, mockTime.milliseconds()),
+            new KeyValueTimestamp("D", 1L, mockTime.milliseconds()),
+            new KeyValueTimestamp("D", 2L, mockTime.milliseconds()),
+            new KeyValueTimestamp("E", 1L, mockTime.milliseconds()),
+            new KeyValueTimestamp("E", 2L, mockTime.milliseconds())
         )));
     }
 
@@ -430,7 +434,7 @@ public class KStreamAggregationIntegrationTest {
 
         startStreams();
 
-        final List<KeyValue<String, Long>> results = receiveMessages(
+        final List<KeyValueTimestamp<String, Long>> results = receiveMessages(
             new StringDeserializer(),
             new LongDeserializer(),
             10);
@@ -438,18 +442,17 @@ public class KStreamAggregationIntegrationTest {
 
         final long window = timestamp / 500 * 500;
         assertThat(results, is(Arrays.asList(
-            KeyValue.pair("1@" + window, 1L),
-            KeyValue.pair("1@" + window, 2L),
-            KeyValue.pair("2@" + window, 1L),
-            KeyValue.pair("2@" + window, 2L),
-            KeyValue.pair("3@" + window, 1L),
-            KeyValue.pair("3@" + window, 2L),
-            KeyValue.pair("4@" + window, 1L),
-            KeyValue.pair("4@" + window, 2L),
-            KeyValue.pair("5@" + window, 1L),
-            KeyValue.pair("5@" + window, 2L)
+            new KeyValueTimestamp("1@" + window, 1L, timestamp),
+            new KeyValueTimestamp("1@" + window, 2L, timestamp),
+            new KeyValueTimestamp("2@" + window, 1L, timestamp),
+            new KeyValueTimestamp("2@" + window, 2L, timestamp),
+            new KeyValueTimestamp("3@" + window, 1L, timestamp),
+            new KeyValueTimestamp("3@" + window, 2L, timestamp),
+            new KeyValueTimestamp("4@" + window, 1L, timestamp),
+            new KeyValueTimestamp("4@" + window, 2L, timestamp),
+            new KeyValueTimestamp("5@" + window, 1L, timestamp),
+            new KeyValueTimestamp("5@" + window, 2L, timestamp)
         )));
-
     }
 
     @Test
@@ -796,17 +799,17 @@ public class KStreamAggregationIntegrationTest {
         kafkaStreams.start();
     }
 
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
-                                                                        final Deserializer<V> valueDeserializer,
-                                                                        final int numMessages)
+    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
+                                                                 final Deserializer<V> valueDeserializer,
+                                                                 final int numMessages)
         throws InterruptedException {
         return receiveMessages(keyDeserializer, valueDeserializer, null, numMessages);
     }
 
-    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
-                                                        final Deserializer<V> valueDeserializer,
-                                                        final Class innerClass,
-                                                        final int numMessages) throws InterruptedException {
+    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessages(final Deserializer<K> keyDeserializer,
+                                                                 final Deserializer<V> valueDeserializer,
+                                                                 final Class innerClass,
+                                                                 final int numMessages) throws InterruptedException {
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
@@ -817,17 +820,17 @@ public class KStreamAggregationIntegrationTest {
             consumerProperties.setProperty(StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
                     Serdes.serdeFrom(innerClass).getClass().getName());
         }
-        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
                 consumerProperties,
                 outputTopic,
                 numMessages,
                 60 * 1000);
     }
 
-    private <K, V> List<KeyValue<K, KeyValue<V, Long>>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
-                                                                                     final Deserializer<V> valueDeserializer,
-                                                                                     final Class innerClass,
-                                                                                     final int numMessages) throws InterruptedException {
+    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(final Deserializer<K> keyDeserializer,
+                                                                                              final Deserializer<V> valueDeserializer,
+                                                                                              final Class innerClass,
+                                                                                              final int numMessages) throws InterruptedException {
         final Properties consumerProperties = new Properties();
         consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + testNo);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
index 646185e..4be14c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -62,22 +63,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
 
-        final List<List<String>> expectedResult = Arrays.asList(
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
             null,
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
             null,
             null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
             null,
             null,
             null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
         leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -89,27 +104,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testInnerRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                null,
-                Collections.singletonList("A-a"),
-                Collections.singletonList("B-a"),
-                Arrays.asList("A-b", "B-b"),
-                null,
-                null,
-                Arrays.asList("C-a", "C-b"),
-                Arrays.asList("A-c", "B-c", "C-c"),
-                null,
-                null,
-                null,
-                Arrays.asList("A-d", "B-d", "C-d"),
-                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+            null,
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
-        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
-                .join(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
-                                 .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+        leftStream.map(MockMapper.noOpKeyValueMapper())
+                .join(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                                 .selectKey(MockMapper.selectKeyKeyValueMapper()),
                        valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
         runTest(expectedResult);
@@ -119,22 +148,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testLeft() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
 
-        final List<List<String>> expectedResult = Arrays.asList(
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
             null,
             null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
             null,
             null,
             null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
         leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -146,27 +189,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testLeftRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                Collections.singletonList("A-null"),
-                Collections.singletonList("A-a"),
-                Collections.singletonList("B-a"),
-                Arrays.asList("A-b", "B-b"),
-                null,
-                null,
-                Arrays.asList("C-a", "C-b"),
-                Arrays.asList("A-c", "B-c", "C-c"),
-                null,
-                null,
-                null,
-                Arrays.asList("A-d", "B-d", "C-d"),
-                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+            null,
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
-        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
-                .leftJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
-                                     .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+        leftStream.map(MockMapper.noOpKeyValueMapper())
+                .leftJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                                     .selectKey(MockMapper.selectKeyKeyValueMapper()),
                         valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
         runTest(expectedResult);
@@ -176,22 +233,36 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testOuter() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
 
-        final List<List<String>> expectedResult = Arrays.asList(
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
             null,
             null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
             null,
             null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
             null,
             null,
             null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
         leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
@@ -203,27 +274,41 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testOuterRepartitioned() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                Collections.singletonList("A-null"),
-                Collections.singletonList("A-a"),
-                Collections.singletonList("B-a"),
-                Arrays.asList("A-b", "B-b"),
-                null,
-                null,
-                Arrays.asList("C-a", "C-b"),
-                Arrays.asList("A-c", "B-c", "C-c"),
-                null,
-                null,
-                null,
-                Arrays.asList("A-d", "B-d", "C-d"),
-                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+            null,
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
-        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
-                .outerJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
-                                .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+        leftStream.map(MockMapper.noOpKeyValueMapper())
+                .outerJoin(rightStream.flatMap(MockMapper.noOpFlatKeyValueMapper())
+                                .selectKey(MockMapper.selectKeyKeyValueMapper()),
                         valueJoiner, JoinWindows.of(ofSeconds(10))).to(OUTPUT_TOPIC);
 
         runTest(expectedResult);
@@ -233,26 +318,84 @@ public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testMultiInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                null,
-                Collections.singletonList("A-a-a"),
-                Collections.singletonList("B-a-a"),
-                Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"),
-                null,
-                null,
-                Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"),
-                Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c",
-                        "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"),
-                null,
-                null,
-                null,
-                Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c",
-                        "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d",
-                        "A-d-d", "B-d-d", "C-d-d"),
-                Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a",
-                        "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-a", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-a", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-b", 6L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-b", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-a", 9L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-b", 9L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-a", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-b", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-a", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-b", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-a", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-b", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-c", 10L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+            null,
+            null,
+            null,
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-a", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-b", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-c", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-a", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-b", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-c", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-a", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-b", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-c", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-b-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-a-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-b-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-c-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-c-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-d-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-d-d", 14L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-d-d", 14L)),
+            Arrays.asList(
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-a-d", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-b-d", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-c-d", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-a", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-b", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-c", 15L),
+                new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
         );
 
         leftStream.join(rightStream, valueJoiner, JoinWindows.of(ofSeconds(10)))
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index ca78d02..772c91d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.streams.KafkaStreamsWrapper;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -34,8 +35,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -82,30 +82,30 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
         TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
 
         streams.close();
-        assertEquals(listener.createdToRevokedSeen(), true);
-        assertEquals(listener.revokedToPendingShutdownSeen(), true);
+        assertTrue(listener.createdToRevokedSeen());
+        assertTrue(listener.revokedToPendingShutdownSeen());
     }
 
     @Test
     public void testInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                null,
-                null,
-                Collections.singletonList("B-a"),
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                null,
-                Collections.singletonList("D-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
         leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
@@ -117,22 +117,22 @@ public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest
     public void testLeft() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
 
-        final List<List<String>> expectedResult = Arrays.asList(
-                null,
-                null,
-                Collections.singletonList("A-null"),
-                null,
-                Collections.singletonList("B-a"),
-                null,
-                null,
-                null,
-                Collections.singletonList("C-null"),
-                null,
-                null,
-                null,
-                null,
-                null,
-                Collections.singletonList("D-d")
+        final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
         );
 
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index 73d1e3d..2b685f9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -59,8 +60,8 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("right").withLoggingDisabled());
     }
 
-    final private String expectedFinalJoinResult = "D-d";
-    final private String expectedFinalMultiJoinResult = "D-d-d";
+    final private KeyValueTimestamp<Long, String> expectedFinalJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L);
+    final private KeyValueTimestamp<Long, String> expectedFinalMultiJoinResult = new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L);
     final private String storeName = appID + "-store";
 
     private Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
@@ -70,7 +71,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             .withLoggingDisabled();
 
     final private class CountingPeek implements ForeachAction<Long, String> {
-        final private String expected;
+        final private KeyValueTimestamp<Long, String> expected;
 
         CountingPeek(final boolean multiJoin) {
             this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult;
@@ -79,7 +80,7 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         @Override
         public void apply(final Long key, final String value) {
             numRecordsExpected++;
-            if (expected.equals(value)) {
+            if (expected.value().equals(value)) {
                 final boolean ret = finalResultReached.compareAndSet(false, true);
 
                 if (!ret) {
@@ -98,22 +99,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
             runTest(expectedFinalJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Collections.singletonList("A-a"),
-                    Collections.singletonList("B-a"),
-                    Collections.singletonList("B-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Collections.singletonList("C-c"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    null,
-                    Collections.singletonList("D-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+                null,
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
             );
 
             leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -129,22 +130,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
             runTest(expectedFinalJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    Collections.singletonList("A-null"),
-                    Collections.singletonList("A-a"),
-                    Collections.singletonList("B-a"),
-                    Collections.singletonList("B-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Collections.singletonList("C-null"),
-                    Collections.singletonList("C-c"),
-                    Collections.singletonList("C-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Collections.singletonList("D-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
             );
 
             leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -160,22 +161,22 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
             leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
             runTest(expectedFinalJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    Collections.singletonList("A-null"),
-                    Collections.singletonList("A-a"),
-                    Collections.singletonList("B-a"),
-                    Collections.singletonList("B-b"),
-                    Collections.singletonList("null-b"),
-                    Collections.singletonList((String) null),
-                    Collections.singletonList("C-null"),
-                    Collections.singletonList("C-c"),
-                    Collections.singletonList("C-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Collections.singletonList("null-d"),
-                    Collections.singletonList("D-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null", 3L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a", 5L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 9L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d", 15L))
             );
 
             leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
@@ -197,22 +198,29 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
         } else {
             // FIXME: the duplicate below for all the multi-joins
             //        are due to KAFKA-6443, should be updated once it is fixed.
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("C-c-c", "C-c-c"),
-                    null,
-                    null,
-                    null,
-                    null,
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                null, // correct would be -> new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)
+                      // we don't get correct value, because of self-join of `rightTable`
+                null,
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.join(rightTable, valueJoiner)
@@ -235,22 +243,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("C-c-c", "C-c-c"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    null,
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+                null,
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.join(rightTable, valueJoiner)
@@ -274,22 +288,33 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList("null-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Arrays.asList("C-c-c", "C-c-c"),
-                    Arrays.asList((String) null, null),
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("null-d", "D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    // incorrect result `null-d` is caused by self-join of `rightTable`
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.join(rightTable, valueJoiner)
@@ -312,22 +337,28 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("C-c-c", "C-c-c"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    null,
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+                null,
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.leftJoin(rightTable, valueJoiner)
@@ -351,22 +382,32 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
-                    Arrays.asList("C-null-null", "C-null-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 7L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                null,
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.leftJoin(rightTable, valueJoiner)
@@ -390,22 +431,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList("null-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
-                    Arrays.asList("C-null-null", "C-null-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("null-d", "D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b", 7L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.leftJoin(rightTable, valueJoiner)
@@ -428,22 +481,30 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList("null-b-b"),
-                    null,
-                    null,
-                    Arrays.asList("C-c-c", "C-c-c"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("null-d-d", "null-d-d"),
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 11L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.outerJoin(rightTable, valueJoiner)
@@ -467,22 +528,34 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList("null-b-b"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
-                    Arrays.asList("C-null-null", "C-null-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    Arrays.asList("null-d-d", "null-d-d"),
-                    Collections.singletonList("D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.outerJoin(rightTable, valueJoiner)
@@ -506,22 +579,36 @@ public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
                     .to(OUTPUT_TOPIC);
             runTest(expectedFinalMultiJoinResult, storeName);
         } else {
-            final List<List<String>> expectedResult = Arrays.asList(
-                    null,
-                    null,
-                    null,
-                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
-                    Collections.singletonList("B-a-a"),
-                    Arrays.asList("B-b-b", "B-b-b"),
-                    Collections.singletonList("null-b-b"),
-                    Arrays.asList((String) null, null),
-                    null,
-                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
-                    Arrays.asList("C-null-null", "C-null-null"),
-                    Collections.singletonList((String) null),
-                    null,
-                    null,
-                    Arrays.asList("null-d-d", "null-d-d", "D-d-d")
+            final List<List<KeyValueTimestamp<Long, String>>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-null-null", 3L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "A-a-a", 4L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-a-a", 5L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "B-b-b", 6L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-b-b", 7L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 8L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 9L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-c-c", 10L)),
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "C-null-null", 11L)),
+                Collections.singletonList(new KeyValueTimestamp<>(ANY_UNIQUE_KEY, null, 12L)),
+                null,
+                null,
+                Arrays.asList(
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "null-d-d", 14L),
+                    new KeyValueTimestamp<>(ANY_UNIQUE_KEY, "D-d-d", 15L))
             );
 
             leftTable.outerJoin(rightTable, valueJoiner)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index e6cf85d..46515aa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -241,12 +241,14 @@ public class IntegrationTestUtils {
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
+    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Long timestamp,
                                                                          final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
+            throws ExecutionException, InterruptedException {
+
         produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions);
     }
 
@@ -260,13 +262,15 @@ public class IntegrationTestUtils {
      * @param <K>                 Key type of the data records
      * @param <V>                 Value type of the data records
      */
+    @SuppressWarnings("WeakerAccess")
     public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic,
                                                                          final Collection<KeyValue<K, V>> records,
                                                                          final Properties producerConfig,
                                                                          final Headers headers,
                                                                          final Long timestamp,
                                                                          final boolean enableTransactions)
-        throws ExecutionException, InterruptedException {
+            throws ExecutionException, InterruptedException {
+
         try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) {
             if (enableTransactions) {
                 producer.initTransactions();
@@ -369,6 +373,7 @@ public class IntegrationTestUtils {
      * @param enableTransactions  Send messages in a transaction
      * @param <V>                 Value type of the data records
      */
+    @SuppressWarnings("WeakerAccess")
     public static <V> void produceValuesSynchronously(final String topic,
                                                       final Collection<V> records,
                                                       final Properties producerConfig,
@@ -427,6 +432,7 @@ public class IntegrationTestUtils {
      * @param <V>                 Value type of the data records
      * @return All the records consumed, or null if no records are consumed
      */
+    @SuppressWarnings("WeakerAccess")
     public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
                                                                                 final String topic,
                                                                                 final int expectedNumRecords) throws InterruptedException {
@@ -444,6 +450,7 @@ public class IntegrationTestUtils {
      * @param <V>                 Value type of the data records
      * @return All the records consumed, or null if no records are consumed
      */
+    @SuppressWarnings("WeakerAccess")
     public static <K, V> List<ConsumerRecord<K, V>> waitUntilMinRecordsReceived(final Properties consumerConfig,
                                                                                 final String topic,
                                                                                 final int expectedNumRecords,
@@ -519,14 +526,14 @@ public class IntegrationTestUtils {
      * @param <K>                Key type of the data records
      * @param <V>                Value type of the data records
      */
-    public static <K, V> List<KeyValue<K, KeyValue<V, Long>>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
+    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilMinKeyValueWithTimestampRecordsReceived(final Properties consumerConfig,
                                                                                                                final String topic,
                                                                                                                final int expectedNumRecords,
                                                                                                                final long waitTime) throws InterruptedException {
-        final List<KeyValue<K, KeyValue<V, Long>>> accumData = new ArrayList<>();
+        final List<KeyValueTimestamp<K, V>> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
             final TestCondition valuesRead = () -> {
-                final List<KeyValue<K, KeyValue<V, Long>>> readData =
+                final List<KeyValueTimestamp<K, V>> readData =
                     readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedNumRecords);
                 accumData.addAll(readData);
                 return accumData.size() >= expectedNumRecords;
@@ -559,33 +566,74 @@ public class IntegrationTestUtils {
      * @param consumerConfig     Kafka Consumer configuration
      * @param topic              Kafka topic to consume from
      * @param expectedRecords    Expected key-value mappings
+     * @param <K>                Key type of the data records
+     * @param <V>                Value type of the data records
+     * @return All the mappings consumed, or null if no records are consumed
+     */
+    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
+                                                                                                      final String topic,
+                                                                                                      final List<KeyValueTimestamp<K, V>> expectedRecords) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, DEFAULT_TIMEOUT, true);
+    }
+
+    /**
+     * Wait until final key-value mappings have been consumed.
+     *
+     * @param consumerConfig     Kafka Consumer configuration
+     * @param topic              Kafka topic to consume from
+     * @param expectedRecords    Expected key-value mappings
      * @param waitTime           Upper bound of waiting time in milliseconds
      * @param <K>                Key type of the data records
      * @param <V>                Value type of the data records
      * @return All the mappings consumed, or null if no records are consumed
      */
+    @SuppressWarnings("WeakerAccess")
     public static <K, V> List<KeyValue<K, V>> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
                                                                                     final String topic,
                                                                                     final List<KeyValue<K, V>> expectedRecords,
                                                                                     final long waitTime) throws InterruptedException {
-        final List<KeyValue<K, V>> accumData = new ArrayList<>();
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false);
+    }
+
+    public static <K, V> List<KeyValueTimestamp<K, V>> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig,
+                                                                                                      final String topic,
+                                                                                                      final List<KeyValueTimestamp<K, V>> expectedRecords,
+                                                                                                      final long waitTime) throws InterruptedException {
+        return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig,
+                                                                           final String topic,
+                                                                           final List<T> expectedRecords,
+                                                                           final long waitTime,
+                                                                           final boolean withTimestamp) throws InterruptedException {
+        final List<T> accumData = new ArrayList<>();
         try (final Consumer<K, V> consumer = createConsumer(consumerConfig)) {
             final TestCondition valuesRead = () -> {
-                final List<KeyValue<K, V>> readData =
-                        readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+                final List<T> readData;
+                if (withTimestamp) {
+                    readData = (List<T>) readKeyValuesWithTimestamp(topic, consumer, waitTime, expectedRecords.size());
+                } else {
+                    readData = (List<T>) readKeyValues(topic, consumer, waitTime, expectedRecords.size());
+                }
                 accumData.addAll(readData);
 
                 // filter out all intermediate records we don't want
-                final List<KeyValue<K, V>> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
+                final List<T> accumulatedActual = accumData.stream().filter(expectedRecords::contains).collect(Collectors.toList());
 
                 // still need to check that for each key, the ordering is expected
-                final Map<K, List<KeyValue<K, V>>> finalAccumData = new HashMap<>();
-                for (final KeyValue<K, V> kv : accumulatedActual) {
-                    finalAccumData.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
+                final Map<K, List<T>> finalAccumData = new HashMap<>();
+                for (final T kv : accumulatedActual) {
+                    finalAccumData.computeIfAbsent(
+                        (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
+                        key -> new ArrayList<>()).add(kv);
                 }
-                final Map<K, List<KeyValue<K, V>>> finalExpected = new HashMap<>();
-                for (final KeyValue<K, V> kv : expectedRecords) {
-                    finalExpected.computeIfAbsent(kv.key, key -> new ArrayList<>()).add(kv);
+                final Map<K, List<T>> finalExpected = new HashMap<>();
+                for (final T kv : expectedRecords) {
+                    finalExpected.computeIfAbsent(
+                        (K) (withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key),
+                        key -> new ArrayList<>()).add(kv);
                 }
 
                 // returns true only if the remaining records in both lists are the same and in the same order
@@ -643,6 +691,7 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
+    @SuppressWarnings("WeakerAccess")
     public static void waitForTopicPartitions(final List<KafkaServer> servers,
                                               final List<TopicPartition> partitions,
                                               final long timeout) throws InterruptedException {
@@ -863,14 +912,14 @@ public class IntegrationTestUtils {
      * @param maxMessages    Maximum number of messages to read via the consumer
      * @return The KeyValue elements retrieved via the consumer
      */
-    private static <K, V> List<KeyValue<K, KeyValue<V, Long>>> readKeyValuesWithTimestamp(final String topic,
-                                                                                          final Consumer<K, V> consumer,
-                                                                                          final long waitTime,
-                                                                                          final int maxMessages) {
-        final List<KeyValue<K, KeyValue<V, Long>>> consumedValues = new ArrayList<>();
+    private static <K, V> List<KeyValueTimestamp<K, V>> readKeyValuesWithTimestamp(final String topic,
+                                                                                   final Consumer<K, V> consumer,
+                                                                                   final long waitTime,
+                                                                                   final int maxMessages) {
+        final List<KeyValueTimestamp<K, V>> consumedValues = new ArrayList<>();
         final List<ConsumerRecord<K, V>> records = readRecords(topic, consumer, waitTime, maxMessages);
         for (final ConsumerRecord<K, V> record : records) {
-            consumedValues.add(new KeyValue<>(record.key(), KeyValue.pair(record.value(), record.timestamp())));
+            consumedValues.add(new KeyValueTimestamp<>(record.key(), record.value(), record.timestamp()));
         }
         return consumedValues;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
index 0bd22d4..a6959e1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorSupplier.java
@@ -56,7 +56,11 @@ public class MockProcessorSupplier<K, V> implements ProcessorSupplier<K, V> {
         return capturedProcessors(1).get(0);
     }
 
-    // get the captured processors with the expected number
+    public int capturedProcessorsCount() {
+        return processors.size();
+    }
+
+        // get the captured processors with the expected number
     public List<MockProcessor<K, V>> capturedProcessors(final int expectedNumberOfProcessors) {
         assertEquals(expectedNumberOfProcessors, processors.size());