You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/05/12 18:20:09 UTC

[kafka] branch trunk updated: KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f54cece  KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254)
f54cece is described below

commit f54cece73e6116566979bcf6a865d803b7c18974
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Tue May 12 11:19:32 2020 -0700

    KAFKA-8770: KIP-557: Drop idempotent KTable source updates (#8254)
    
    Drops idempotent updates from KTable source operators.
    Specifically, drop updates in which the value is unchanged,
    and the timestamp is the same or larger.
    
    Implements: KIP-557
    Reviewers: Bruno Cadonna <br...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../streams/kstream/internals/KTableSource.java    | 37 +++++++++++++---
 .../internals/metrics/ProcessorNodeMetrics.java    | 22 ++++++++++
 .../state/internals/MeteredKeyValueStore.java      |  8 ++--
 .../internals/MeteredTimestampedKeyValueStore.java | 51 +++++++++++++++++++++-
 .../internals/ValueAndTimestampSerializer.java     | 45 +++++++++++++++++++
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 18 ++++++--
 .../KTableSourceTopicRestartIntegrationTest.java   |  1 +
 .../integration/RestoreIntegrationTest.java        | 13 +++---
 .../kstream/internals/KTableSourceTest.java        | 43 ++++++++++++++++++
 .../internals/KTableTransformValuesTest.java       |  4 +-
 .../kstream/internals/SuppressScenarioTest.java    | 19 ++++----
 .../metrics/ProcessorNodeMetricsTest.java          | 22 ++++++++++
 .../MeteredTimestampedKeyValueStoreTest.java       | 48 ++++++++++++++++++++
 .../internals/ValueAndTimestampSerializerTest.java | 24 ++++++++++
 14 files changed, 322 insertions(+), 33 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index b9f3580..f6756f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -21,15 +21,20 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 
 import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.skippedIdempotentUpdatesSensor;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
     private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
@@ -74,10 +79,11 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
     private class KTableSourceProcessor extends AbstractProcessor<K, V> {
 
-        private TimestampedKeyValueStore<K, V> store;
+        private MeteredTimestampedKeyValueStore<K, V> store;
         private TimestampedTupleForwarder<K, V> tupleForwarder;
         private StreamsMetricsImpl metrics;
         private Sensor droppedRecordsSensor;
+        private Sensor skippedIdempotentUpdatesSensor = null;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -86,12 +92,24 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
             metrics = (StreamsMetricsImpl) context.metrics();
             droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
             if (queryableName != null) {
-                store = (TimestampedKeyValueStore<K, V>) context.getStateStore(queryableName);
+                final StateStore stateStore = context.getStateStore(queryableName);
+                try {
+                    store = ((WrappedStateStore<MeteredTimestampedKeyValueStore<K, V>, K, V>) stateStore).wrapped();
+                } catch (final ClassCastException e) {
+                    throw new IllegalStateException("Unexpected store type: " + stateStore.getClass() + " for store: " + queryableName, e);
+                }
                 tupleForwarder = new TimestampedTupleForwarder<>(
                     store,
                     context,
                     new TimestampedCacheFlushListener<>(context),
                     sendOldValues);
+                skippedIdempotentUpdatesSensor = skippedIdempotentUpdatesSensor(
+                    Thread.currentThread().getName(), 
+                    context.taskId().toString(), 
+                    ((InternalProcessorContext) context).currentNode().name(), 
+                    metrics
+                );
+
             }
         }
 
@@ -108,7 +126,8 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = store.get(key);
+                final RawAndDeserializedValue<V> tuple = store.getWithBinary(key);
+                final ValueAndTimestamp<V> oldValueAndTimestamp = tuple.value;
                 final V oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
@@ -119,8 +138,14 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
                 } else {
                     oldValue = null;
                 }
-                store.put(key, ValueAndTimestamp.make(value, context().timestamp()));
-                tupleForwarder.maybeForward(key, value, oldValue);
+                final ValueAndTimestamp<V> newValueAndTimestamp = ValueAndTimestamp.make(value, context().timestamp());
+                final boolean isDifferentValue = 
+                    store.putIfDifferentValues(key, newValueAndTimestamp, tuple.serializedValue);
+                if (isDifferentValue) {
+                    tupleForwarder.maybeForward(key, value, oldValue);
+                }  else {
+                    skippedIdempotentUpdatesSensor.record();
+                }
             } else {
                 context().forward(key, new Change<>(value, null));
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
index b495f7f..c157d83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java
@@ -47,6 +47,12 @@ public class ProcessorNodeMetrics {
     private static final String SUPPRESSION_EMIT_RATE_DESCRIPTION =
         RATE_DESCRIPTION_PREFIX + SUPPRESSION_EMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
 
+    private static final String IDEMPOTENT_UPDATE_SKIP = "idempotent-update-skip";
+    private static final String IDEMPOTENT_UPDATE_SKIP_DESCRIPTION = "skipped idempotent updates";
+    private static final String IDEMPOTENT_UPDATE_SKIP_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + IDEMPOTENT_UPDATE_SKIP_DESCRIPTION;
+    private static final String IDEMPOTENT_UPDATE_SKIP_RATE_DESCRIPTION =
+            RATE_DESCRIPTION_PREFIX + IDEMPOTENT_UPDATE_SKIP_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+
     private static final String PROCESS = "process";
     private static final String PROCESS_DESCRIPTION = "calls to process";
     private static final String PROCESS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PROCESS_DESCRIPTION;
@@ -108,6 +114,22 @@ public class ProcessorNodeMetrics {
         );
     }
 
+    public static Sensor skippedIdempotentUpdatesSensor(final String threadId,
+            final String taskId,
+            final String processorNodeId,
+            final StreamsMetricsImpl streamsMetrics) {
+        return throughputSensor(
+            threadId,
+            taskId,
+            processorNodeId,
+            IDEMPOTENT_UPDATE_SKIP,
+            IDEMPOTENT_UPDATE_SKIP_RATE_DESCRIPTION,
+            IDEMPOTENT_UPDATE_SKIP_TOTAL_DESCRIPTION,
+            RecordingLevel.DEBUG,
+            streamsMetrics
+        );
+    }
+
     public static Sensor processSensor(final String threadId,
                                        final String taskId,
                                        final String processorNodeId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 6076702..c844e03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -54,9 +54,9 @@ public class MeteredKeyValueStore<K, V>
 
     private final String metricsScope;
     protected final Time time;
-    private Sensor putSensor;
+    protected Sensor putSensor;
     private Sensor putIfAbsentSensor;
-    private Sensor getSensor;
+    protected Sensor getSensor;
     private Sensor deleteSensor;
     private Sensor putAllSensor;
     private Sensor allSensor;
@@ -206,11 +206,11 @@ public class MeteredKeyValueStore<K, V>
         }
     }
 
-    private V outerValue(final byte[] value) {
+    protected V outerValue(final byte[] value) {
         return value != null ? serdes.valueFrom(value) : null;
     }
 
-    private Bytes keyBytes(final K key) {
+    protected Bytes keyBytes(final K key) {
         return Bytes.wrap(serdes.rawKey(key));
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 468b554..d1446dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -16,9 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +38,7 @@ import org.apache.kafka.streams.state.ValueAndTimestamp;
  * @param <V>
  */
 public class MeteredTimestampedKeyValueStore<K, V>
-    extends MeteredKeyValueStore<K, ValueAndTimestamp<V>>
+    extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> 
     implements TimestampedKeyValueStore<K, V> {
 
     MeteredTimestampedKeyValueStore(final KeyValueStore<Bytes, byte[]> inner,
@@ -53,4 +56,48 @@ public class MeteredTimestampedKeyValueStore<K, V>
             keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
             valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.valueSerde()) : valueSerde);
     }
-}
\ No newline at end of file
+
+    public RawAndDeserializedValue<V> getWithBinary(final K key) {
+        try {
+            return maybeMeasureLatency(() -> { 
+                final byte[] serializedValue = wrapped().get(keyBytes(key));
+                return new RawAndDeserializedValue<V>(serializedValue, outerValue(serializedValue));
+            }, time, getSensor);
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public boolean putIfDifferentValues(final K key,
+                                        final ValueAndTimestamp<V> newValue,
+                                        final byte[] oldSerializedValue) {
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] newSerializedValue = serdes.rawValue(newValue);
+                    if (ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue)) {
+                        return false;
+                    } else {
+                        wrapped().put(keyBytes(key), newSerializedValue);
+                        return true;
+                    }
+                },
+                time,
+                putSensor
+            );
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key, newValue);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
+    public static class RawAndDeserializedValue<ValueType> {
+        public final byte[] serializedValue;
+        public final ValueAndTimestamp<ValueType> value;
+        public RawAndDeserializedValue(final byte[] serializedValue, final ValueAndTimestamp<ValueType> value) {
+            this.serializedValue = serializedValue;
+            this.value = value;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
index 614a0f5..3b2663d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java
@@ -34,6 +34,51 @@ public class ValueAndTimestampSerializer<V> implements Serializer<ValueAndTimest
         timestampSerializer = new LongSerializer();
     }
 
+    private static boolean skipTimestampAndCompareValues(final byte[] left, final byte[] right) {
+        for (int i = Long.BYTES; i < left.length; i++) {
+            if (left[i] != right[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static long extractTimestamp(final byte[] bytes) {
+        final byte[] timestampBytes = new byte[Long.BYTES];
+        for (int i = 0; i < Long.BYTES; i++) {
+            timestampBytes[i] = bytes[i];
+        }
+        return ByteBuffer.wrap(timestampBytes).getLong();
+    }
+
+    /**
+     * @param left  the serialized byte array of the old record in state store
+     * @param right the serialized byte array of the new record being processed
+     * @return true if the two serialized values are the same (excluding timestamp) or 
+     *              if the timestamp of right is less than left (indicating out of order record)
+     *         false otherwise
+     */
+    public static boolean compareValuesAndCheckForIncreasingTimestamp(final byte[] left, final byte[] right) {
+        if (left == right) {
+            return true;
+        }
+        if (left == null || right == null) {
+            return false;
+        }
+
+        final int length = left.length;
+        if (right.length != length) {
+            return false;
+        }
+
+        final long leftTimestamp = extractTimestamp(left);
+        final long rightTimestamp = extractTimestamp(right);
+        if (rightTimestamp < leftTimestamp) {
+            return false;
+        }
+        return skipTimestampAndCompareValues(left, right);
+    }
+
     @Override
     public void configure(final Map<String, ?> configs,
                           final boolean isKey) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 34cf1c1..5ebd9ef 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -49,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
+import static java.util.Collections.singletonMap;
+
 
 import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -386,12 +388,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
 
             // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because
             // it's not possible to know whether a result was previously emitted.
+            // HOWEVER, when the final join result is materialized (either explicitly or
+            // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
             // For the left join, the tombstone is necessary.
             left.pipeInput("lhs1", (String) null);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
-                    is(mkMap(mkEntry("lhs1", null)))
+                    is(leftJoin || !(materialized || rejoin)
+                           ? mkMap(mkEntry("lhs1", null))
+                           : emptyMap())
                 );
                 if (materialized) {
                     assertThat(
@@ -468,11 +474,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join,
             // since it impossible to know whether the prior FK existed or not (and thus whether any results have
             // previously been emitted)
+            // previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or
+            // implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it.
             // The left join emits a _necessary_ update (since the lhs record has actually changed)
             left.pipeInput("lhs1", "lhsValue1|rhs2");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)))
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
+                       : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
             );
             if (materialized) {
                 assertThat(
@@ -484,7 +494,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             left.pipeInput("lhs1", "lhsValue1|rhs3");
             assertThat(
                 outputTopic.readKeyValuesToMap(),
-                is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)))
+                is(leftJoin
+                       ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
+                       : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null))
             );
             if (materialized) {
                 assertThat(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 1cb154a..0a67f41 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -107,6 +107,7 @@ public class KTableSourceTopicRestartIntegrationTest {
     @After
     public void after() throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        CLUSTER.deleteAllTopicsAndWait(60000L);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 9cb0b85..160313a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -127,7 +127,7 @@ public class RestoreIntegrationTest {
         // restoring from 1000 to 4000 (committed), and then process from 4000 to 5000 on each of the two partitions
         final int offsetLimitDelta = 1000;
         final int offsetCheckpointed = 1000;
-        createStateForRestoration(INPUT_STREAM);
+        createStateForRestoration(INPUT_STREAM, 0);
         setCommittedOffset(INPUT_STREAM, offsetLimitDelta);
 
         final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
@@ -143,7 +143,7 @@ public class RestoreIntegrationTest {
         builder.table(INPUT_STREAM, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("store").withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Integer()))
                 .toStream()
                 .foreach((key, value) -> {
-                    if (numReceived.incrementAndGet() == 2 * offsetLimitDelta) {
+                    if (numReceived.incrementAndGet() == offsetLimitDelta * 2) {
                         shutdownLatch.countDown();
                     }
                 });
@@ -190,8 +190,8 @@ public class RestoreIntegrationTest {
 
         // restoring from 1000 to 5000, and then process from 5000 to 10000 on each of the two partitions
         final int offsetCheckpointed = 1000;
-        createStateForRestoration(APPID + "-store-changelog");
-        createStateForRestoration(INPUT_STREAM);
+        createStateForRestoration(APPID + "-store-changelog", 0);
+        createStateForRestoration(INPUT_STREAM, 10000);
 
         final StateDirectory stateDirectory = new StateDirectory(new StreamsConfig(props), new MockTime(), true);
         // note here the checkpointed offset is the last processed record's offset, so without control message we should write this offset - 1
@@ -345,7 +345,7 @@ public class RestoreIntegrationTest {
         public void close() { }
     }
 
-    private void createStateForRestoration(final String changelogTopic) {
+    private void createStateForRestoration(final String changelogTopic, final int startingOffset) {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
 
@@ -353,7 +353,8 @@ public class RestoreIntegrationTest {
                      new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer())) {
 
             for (int i = 0; i < numberOfKeys; i++) {
-                producer.send(new ProducerRecord<>(changelogTopic, i, i));
+                final int offset = startingOffset + i;
+                producer.send(new ProducerRecord<>(changelogTopic, offset, offset));
             }
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 236ea58..ae512b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,8 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KeyValueTimestamp;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -33,6 +35,9 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.streams.TestOutputTopic;
+
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -87,6 +92,44 @@ public class KTableSourceTest {
     }
 
     @Test
+    public void testKTableSourceEmitOnChange() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()), Materialized.as("store"))
+               .toStream()
+               .to("output");
+
+        try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, Integer> inputTopic =
+                driver.createInputTopic(topic1, new StringSerializer(), new IntegerSerializer());
+            final TestOutputTopic<String, Integer> outputTopic =
+                driver.createOutputTopic("output", new StringDeserializer(), new IntegerDeserializer());
+
+            inputTopic.pipeInput("A", 1, 10L);
+            inputTopic.pipeInput("B", 2, 11L);
+            inputTopic.pipeInput("A", 1, 12L);
+            inputTopic.pipeInput("B", 3, 13L);
+            // this record should be kept since this is out of order, so the timestamp
+            // should be updated in this scenario
+            inputTopic.pipeInput("A", 1, 9L);
+
+            assertEquals(
+                1.0,
+                getMetricByName(driver.metrics(), "idempotent-update-skip-total", "stream-processor-node-metrics").metricValue()
+            );
+
+            assertEquals(
+                asList(new TestRecord<>("A", 1, Instant.ofEpochMilli(10L)),
+                           new TestRecord<>("B", 2, Instant.ofEpochMilli(11L)),
+                           new TestRecord<>("B", 3, Instant.ofEpochMilli(13L)),
+                           new TestRecord<>("A", 1, Instant.ofEpochMilli(9L))),
+                outputTopic.readRecordsToList()
+            );
+        }
+    }
+
+    @Test
     public void kTableShouldLogAndMeterOnSkippedRecordsWithBuiltInMetrics0100To24() {
         kTableShouldLogAndMeterOnSkippedRecords(StreamsConfig.METRICS_0100_TO_24);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 80cdae4..a48dcdb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -398,8 +398,8 @@ public class KTableTransformValuesTest {
                 driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer());
 
         inputTopic.pipeInput("A", "ignored", 5L);
-        inputTopic.pipeInput("A", "ignored", 15L);
-        inputTopic.pipeInput("A", "ignored", 10L);
+        inputTopic.pipeInput("A", "ignored1", 15L);
+        inputTopic.pipeInput("A", "ignored2", 10L);
 
         assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
                 new KeyValueTimestamp<>("A", "0", 15),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 55f8670..29eb539 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -138,19 +138,19 @@ public class SuppressScenarioTest {
                     new KeyValueTimestamp<>("x", 1L, 3L)
                 )
             );
-            inputTopic.pipeInput("x", "x", 4L);
+            inputTopic.pipeInput("x", "y", 4L);
             verify(
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("x", 0L, 4L),
-                    new KeyValueTimestamp<>("x", 1L, 4L)
+                    new KeyValueTimestamp<>("y", 1L, 4L)
                 )
             );
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("x", 0L, 4L),
-                    new KeyValueTimestamp<>("x", 1L, 4L)
+                    new KeyValueTimestamp<>("y", 1L, 4L)
                 )
             );
         }
@@ -209,12 +209,12 @@ public class SuppressScenarioTest {
             );
 
 
-            inputTopic.pipeInput("tick", "tick", 4L);
+            inputTopic.pipeInput("tick", "tock", 4L);
             verify(
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
                     new KeyValueTimestamp<>("tick", 0L, 4L),
-                    new KeyValueTimestamp<>("tick", 1L, 4L)
+                    new KeyValueTimestamp<>("tock", 1L, 4L)
                 )
             );
             // tick is still buffered, since it was first inserted at time 3, and it is only time 4 right now.
@@ -614,11 +614,11 @@ public class SuppressScenarioTest {
             );
 
 
-            inputTopicRight.pipeInput("tick", "tick", 21L);
+            inputTopicRight.pipeInput("tick", "tick1", 21L);
             verify(
                 drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
                 asList(
-                    new KeyValueTimestamp<>("tick", "(null,tick)", 21), // just a testing artifact
+                    new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // just a testing artifact
                     new KeyValueTimestamp<>("A", "(b,2)", 13L)
                 )
             );
@@ -644,7 +644,6 @@ public class SuppressScenarioTest {
             .to("output", Produced.with(Serdes.String(), Serdes.String()));
 
         final Topology topology = builder.build();
-        System.out.println(topology.describe());
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             final TestInputTopic<String, String> inputTopicRight =
                 driver.createInputTopic("right", STRING_SERIALIZER, STRING_SERIALIZER);
@@ -703,11 +702,11 @@ public class SuppressScenarioTest {
             );
 
 
-            inputTopicLeft.pipeInput("tick", "tick", 21L);
+            inputTopicLeft.pipeInput("tick", "tick1", 21L);
             verify(
                 drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER),
                 asList(
-                    new KeyValueTimestamp<>("tick", "(tick,null)", 21), // just a testing artifact
+                    new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // just a testing artifact
                     new KeyValueTimestamp<>("A", "(2,b)", 13L)
                 )
             );
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
index 9778db8..8563167 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
@@ -98,6 +98,28 @@ public class ProcessorNodeMetricsTest {
     }
 
     @Test
+    public void shouldGetIdempotentUpdateSkipSensor() {
+        final String metricNamePrefix = "idempotent-update-skip";
+        final String descriptionOfCount = "The total number of skipped idempotent updates";
+        final String descriptionOfRate = "The average number of skipped idempotent updates per second";
+        expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, metricNamePrefix, RecordingLevel.DEBUG))
+            .andReturn(expectedSensor);
+        expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap);
+        StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+            expectedSensor,
+            StreamsMetricsImpl.PROCESSOR_NODE_LEVEL_GROUP,
+            tagMap,
+            metricNamePrefix,
+            descriptionOfRate,
+            descriptionOfCount
+        );
+
+        verifySensor(
+            () -> ProcessorNodeMetrics.skippedIdempotentUpdatesSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics)
+        );
+    }
+
+    @Test
     public void shouldGetProcessSensor() {
         final String metricNamePrefix = "process";
         final String descriptionOfCount = "The total number of calls to process";
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f1795c..b32c331 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.RawAndDeserializedValue;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.easymock.EasyMockRule;
 import org.easymock.Mock;
@@ -65,6 +66,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -182,6 +184,52 @@ public class MeteredTimestampedKeyValueStoreTest {
     }
 
     @Test
+    public void shouldGetWithBinary() {
+        expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
+
+        inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
+        expectLastCall();
+        init();
+
+        final RawAndDeserializedValue<String> valueWithBinary = metered.getWithBinary(key);
+        assertEquals(valueWithBinary.value, valueAndTimestamp);
+        assertEquals(valueWithBinary.serializedValue, valueAndTimestampBytes);
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    public void shouldNotPutIfSameValuesAndGreaterTimestamp() {
+        init();
+
+        metered.put(key, valueAndTimestamp);
+        final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
+        final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
+
+        final ValueAndTimestamp<String> newValueAndTimestamp = ValueAndTimestamp.make("value", 98L);
+        assertFalse(metered.putIfDifferentValues(key,
+                                                 newValueAndTimestamp,
+                                                 encodedOldValue));
+        verify(inner);
+    }
+
+    @SuppressWarnings("resource")
+    @Test
+    public void shouldPutIfOutOfOrder() {
+        inner.put(eq(keyBytes), aryEq(valueAndTimestampBytes));
+        expectLastCall();
+        init();
+
+        metered.put(key, valueAndTimestamp);
+
+        final ValueAndTimestampSerde<String> stringSerde = new ValueAndTimestampSerde<>(Serdes.String());
+        final byte[] encodedOldValue = stringSerde.serializer().serialize("TOPIC", valueAndTimestamp);
+
+        final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = ValueAndTimestamp.make("value", 95L);
+        assertTrue(metered.putIfDifferentValues(key, outOfOrderValueAndTimestamp, encodedOldValue));
+        verify(inner);
+    }
+
+    @Test
     public void shouldGetBytesFromInnerStoreAndReturnGetMetric() {
         expect(inner.get(keyBytes)).andReturn(valueAndTimestampBytes);
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
index 63a5661..c22221e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializerTest.java
@@ -25,6 +25,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ValueAndTimestampSerializerTest {
     private static final String TOPIC = "some-topic";
@@ -51,6 +53,28 @@ public class ValueAndTimestampSerializerTest {
     }
 
     @Test
+    public void shouldDropSerializedValueIfEqualWithGreaterTimestamp() {
+        final String value = "food";
+
+        final ValueAndTimestamp<String> oldValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP);
+        final byte[] oldSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+        final ValueAndTimestamp<String> newValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP + 1);
+        final byte[] newSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, newValueAndTimestamp);
+        assertTrue(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, newSerializedValue));
+    }
+
+    @Test
+    public void shouldKeepSerializedValueIfOutOfOrder() {
+        final String value = "balls";
+
+        final ValueAndTimestamp<String> oldValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP);
+        final byte[] oldSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, oldValueAndTimestamp);
+        final ValueAndTimestamp<String> outOfOrderValueAndTimestamp = ValueAndTimestamp.make(value, TIMESTAMP - 1);
+        final byte[] outOfOrderSerializedValue = STRING_SERDE.serializer().serialize(TOPIC, outOfOrderValueAndTimestamp);
+        assertFalse(ValueAndTimestampSerializer.compareValuesAndCheckForIncreasingTimestamp(oldSerializedValue, outOfOrderSerializedValue));
+    }
+
+    @Test
     public void shouldSerializeNullDataAsNull() {
         final byte[] serialized =
                 STRING_SERDE.serializer().serialize(TOPIC, ValueAndTimestamp.make(null, TIMESTAMP));