You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/02 06:11:00 UTC

[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

    [ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16635064#comment-16635064 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

mjsax closed pull request #5693: KAFKA-7223: In-Memory Suppression Buffering
URL: https://github.com/apache/kafka/pull/5693
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 49fe96ba20c..6db7a708541 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
 import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 
 import java.time.Duration;
 
@@ -155,6 +155,6 @@ static StrictBufferConfig unbounded() {
      * @return a suppression configuration
      */
     static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
-        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null, false);
+        return new SuppressedInternal<>(timeToWaitForMoreEvents, bufferConfig, null, false);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
index 8a2e619b7b5..9bb83733c82 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.ByteBufferDeserializer;
-import org.apache.kafka.common.serialization.ByteBufferSerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serializer;
@@ -30,6 +28,17 @@
 public class FullChangeSerde<T> implements Serde<Change<T>> {
     private final Serde<T> inner;
 
+    @SuppressWarnings("unchecked")
+    public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) {
+        if (serde == null) {
+            return null;
+        } else if (serde instanceof FullChangeSerde) {
+            return (FullChangeSerde<T>) serde;
+        } else {
+            return new FullChangeSerde<T>((Serde<T>) serde);
+        }
+    }
+
     public FullChangeSerde(final Serde<T> inner) {
         this.inner = requireNonNull(inner);
     }
@@ -47,7 +56,6 @@ public void close() {
     @Override
     public Serializer<Change<T>> serializer() {
         final Serializer<T> innerSerializer = inner.serializer();
-        final ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer();
 
         return new Serializer<Change<T>>() {
             @Override
@@ -65,8 +73,8 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
                 final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue);
                 final int newSize = newBytes == null ? -1 : newBytes.length;
 
-                final ByteBuffer buffer = ByteBuffer.allocate(
-                    4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)
+                final ByteBuffer buffer = ByteBuffer.wrap(
+                    new byte[4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize)]
                 );
                 buffer.putInt(oldSize);
                 if (oldBytes != null) {
@@ -76,7 +84,7 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
                 if (newBytes != null) {
                     buffer.put(newBytes);
                 }
-                return byteBufferSerializer.serialize(null, buffer);
+                return buffer.array();
             }
 
             @Override
@@ -89,7 +97,6 @@ public void close() {
     @Override
     public Deserializer<Change<T>> deserializer() {
         final Deserializer<T> innerDeserializer = inner.deserializer();
-        final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer();
         return new Deserializer<Change<T>>() {
             @Override
             public void configure(final Map<String, ?> configs, final boolean isKey) {
@@ -101,7 +108,7 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {
                 if (data == null) {
                     return null;
                 }
-                final ByteBuffer buffer = byteBufferDeserializer.deserialize(null, data);
+                final ByteBuffer buffer = ByteBuffer.wrap(data);
 
                 final int oldSize = buffer.getInt();
                 final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize];
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java
new file mode 100644
index 00000000000..a69002f9900
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullTimeWindowedSerde.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
+import org.apache.kafka.streams.kstream.Windowed;
+
+class FullTimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> {
+    FullTimeWindowedSerde(final Serde<T> inner, final long windowSize) {
+        super(
+            new TimeWindowedSerializer<>(inner.serializer()),
+            new TimeWindowedDeserializer<>(inner.deserializer(), windowSize)
+        );
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c5b29702c7c..9c76766d9c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -39,7 +39,7 @@
 import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
 import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -356,12 +356,11 @@ public String queryableStoreName() {
     public KTable<K, V> suppress(final Suppressed<K> suppressed) {
         final String name = builder.newProcessorName(SUPPRESS_NAME);
 
-        // TODO: follow-up pr to forward the k/v serdes
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
             () -> new KTableSuppressProcessor<>(
                 buildSuppress(suppressed),
-                null,
-                null
+                keySerde,
+                valSerde == null ? null : new FullChangeSerde<>(valSerde)
             );
 
         final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(
@@ -387,18 +386,18 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) {
+    private SuppressedInternal<K> buildSuppress(final Suppressed<K> suppress) {
         if (suppress instanceof FinalResultsSuppressionBuilder) {
             final long grace = findAndVerifyWindowGrace(streamsGraphNode);
 
             final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder) suppress;
 
-            final SuppressedImpl<? extends Windowed> finalResultsSuppression =
+            final SuppressedInternal<? extends Windowed> finalResultsSuppression =
                 builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
 
-            return (SuppressedImpl<K>) finalResultsSuppression;
-        } else if (suppress instanceof SuppressedImpl) {
-            return (SuppressedImpl<K>) suppress;
+            return (SuppressedInternal<K>) finalResultsSuppression;
+        } else if (suppress instanceof SuppressedInternal) {
+            return (SuppressedInternal<K>) suppress;
         } else {
             throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 2ee8f7c5958..753ef0f85a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -27,7 +27,6 @@
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -93,7 +92,7 @@
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
@@ -120,7 +119,7 @@
             materialize(materializedInternal),
             new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
@@ -149,7 +148,7 @@
             materialize(materializedInternal),
             new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer),
             materializedInternal.isQueryable(),
-            materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null,
+            materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null,
             materializedInternal.valueSerde());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
similarity index 84%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
index e731dc6f5e1..67d3783867c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigInternal.java
@@ -20,8 +20,8 @@
 
 import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
 
-abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
-    public abstract long maxKeys();
+abstract class BufferConfigInternal<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
+    public abstract long maxRecords();
 
     public abstract long maxBytes();
 
@@ -39,12 +39,12 @@
 
     @Override
     public Suppressed.StrictBufferConfig shutDownWhenFull() {
-        return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN);
+        return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN);
     }
 
     @Override
     public Suppressed.BufferConfig emitEarlyWhenFull() {
-        return new EagerBufferConfigImpl(maxKeys(), maxBytes());
+        return new EagerBufferConfigImpl(maxRecords(), maxBytes());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
index 0c2c883e18a..161f934f3a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
@@ -20,13 +20,13 @@
 
 import java.util.Objects;
 
-public class EagerBufferConfigImpl extends BufferConfigImpl {
+public class EagerBufferConfigImpl extends BufferConfigInternal {
 
-    private final long maxKeys;
+    private final long maxRecords;
     private final long maxBytes;
 
-    public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
-        this.maxKeys = maxKeys;
+    public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) {
+        this.maxRecords = maxRecords;
         this.maxBytes = maxBytes;
     }
 
@@ -37,12 +37,12 @@ public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
 
     @Override
     public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
-        return new EagerBufferConfigImpl(maxKeys, byteLimit);
+        return new EagerBufferConfigImpl(maxRecords, byteLimit);
     }
 
     @Override
-    public long maxKeys() {
-        return maxKeys;
+    public long maxRecords() {
+        return maxRecords;
     }
 
     @Override
@@ -60,17 +60,17 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
-        return maxKeys == that.maxKeys &&
+        return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxKeys, maxBytes);
+        return Objects.hash(maxRecords, maxBytes);
     }
 
     @Override
     public String toString() {
-        return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}';
+        return "EagerBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes + '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index db09307d48c..523ae0602c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -18,7 +18,6 @@
 
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
 
 import java.time.Duration;
 import java.util.Objects;
@@ -30,11 +29,11 @@ public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig buffer
         this.bufferConfig = bufferConfig;
     }
 
-    public SuppressedImpl<K> buildFinalResultsSuppression(final Duration gracePeriod) {
-        return new SuppressedImpl<>(
+    public SuppressedInternal<K> buildFinalResultsSuppression(final Duration gracePeriod) {
+        return new SuppressedInternal<>(
             gracePeriod,
             bufferConfig,
-            (ProcessorContext context, K key) -> key.window().end(),
+            TimeDefinitions.WindowEndTimeDefinition.instance(),
             true
         );
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
new file mode 100644
index 00000000000..677a662f79d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer {
+    private final Map<Bytes, TimeKey> index = new HashMap<>();
+    private final TreeMap<TimeKey, ContextualRecord> sortedMap = new TreeMap<>();
+    private long memBufferSize = 0L;
+    private long minTimestamp = Long.MAX_VALUE;
+
+    @Override
+    public void evictWhile(final Supplier<Boolean> predicate,
+                           final Consumer<KeyValue<Bytes, ContextualRecord>> callback) {
+        final Iterator<Map.Entry<TimeKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator();
+
+        if (predicate.get()) {
+            Map.Entry<TimeKey, ContextualRecord> next = null;
+            if (delegate.hasNext()) {
+                next = delegate.next();
+            }
+
+            // predicate being true means we read one record, call the callback, and then remove it
+            while (next != null && predicate.get()) {
+                callback.accept(new KeyValue<>(next.getKey().key(), next.getValue()));
+
+                delegate.remove();
+                index.remove(next.getKey().key());
+
+                memBufferSize = memBufferSize - computeRecordSize(next.getKey().key(), next.getValue());
+
+                // peek at the next record so we can update the minTimestamp
+                if (delegate.hasNext()) {
+                    next = delegate.next();
+                    minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time();
+                } else {
+                    next = null;
+                    minTimestamp = Long.MAX_VALUE;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void put(final long time,
+                    final Bytes key,
+                    final ContextualRecord value) {
+        // non-resetting semantics:
+        // if there was a previous version of the same record,
+        // then insert the new record in the same place in the priority queue
+
+        final TimeKey previousKey = index.get(key);
+        if (previousKey == null) {
+            final TimeKey nextKey = new TimeKey(time, key);
+            index.put(key, nextKey);
+            sortedMap.put(nextKey, value);
+            minTimestamp = Math.min(minTimestamp, time);
+            memBufferSize = memBufferSize + computeRecordSize(key, value);
+        } else {
+            final ContextualRecord removedValue = sortedMap.put(previousKey, value);
+            memBufferSize =
+                memBufferSize
+                    + computeRecordSize(key, value)
+                    - (removedValue == null ? 0 : computeRecordSize(key, removedValue));
+        }
+    }
+
+    @Override
+    public int numRecords() {
+        return index.size();
+    }
+
+    @Override
+    public long bufferSize() {
+        return memBufferSize;
+    }
+
+    @Override
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    private long computeRecordSize(final Bytes key, final ContextualRecord value) {
+        long size = 0L;
+        size += 8; // buffer time
+        size += key.get().length;
+        if (value != null) {
+            size += value.sizeBytes();
+        }
+        return size;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 6f0021fbc49..57e5066d09e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -17,70 +17,117 @@
 package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-
-import java.time.Duration;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
 
 import static java.util.Objects.requireNonNull;
 
 public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
-    private final SuppressedImpl<K> suppress;
+    private final long maxRecords;
+    private final long maxBytes;
+    private final long suppressDurationMillis;
+    private final TimeOrderedKeyValueBuffer buffer;
+    private final TimeDefinition<K> bufferTimeDefinition;
+    private final BufferFullStrategy bufferFullStrategy;
+    private final boolean shouldSuppressTombstones;
     private InternalProcessorContext internalProcessorContext;
 
-    private final Serde<K> keySerde;
-    private final Serde<Change<V>> valueSerde;
+    private Serde<K> keySerde;
+    private Serde<Change<V>> valueSerde;
 
-    public KTableSuppressProcessor(final SuppressedImpl<K> suppress,
+    public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
                                    final Serde<K> keySerde,
-                                   final Serde<Change<V>> valueSerde) {
-        this.suppress = requireNonNull(suppress);
+                                   final FullChangeSerde<V> valueSerde) {
+        requireNonNull(suppress);
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
+        maxRecords = suppress.getBufferConfig().maxRecords();
+        maxBytes = suppress.getBufferConfig().maxBytes();
+        suppressDurationMillis = suppress.getTimeToWaitForMoreEvents().toMillis();
+        buffer = new InMemoryTimeOrderedKeyValueBuffer();
+        bufferTimeDefinition = suppress.getTimeDefinition();
+        bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy();
+        shouldSuppressTombstones = suppress.shouldSuppressTombstones();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext context) {
         internalProcessorContext = (InternalProcessorContext) context;
+        this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde;
+        this.valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde;
     }
 
     @Override
     public void process(final K key, final Change<V> value) {
-        if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) {
-            if (shouldForward(value)) {
-                internalProcessorContext.forward(key, value);
-            } // else skip
-        } else {
-            throw new NotImplementedException();
-        }
+        buffer(key, value);
+        enforceConstraints();
     }
 
-    private boolean shouldForward(final Change<V> value) {
-        return !(value.newValue == null && suppress.suppressTombstones());
-    }
+    private void buffer(final K key, final Change<V> value) {
+        final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key);
+        final ProcessorRecordContext recordContext = internalProcessorContext.recordContext();
 
-    private long definedRecordTime(final K key) {
-        return suppress.getTimeDefinition().time(internalProcessorContext, key);
+        final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key));
+        final byte[] serializedValue = valueSerde.serializer().serialize(null, value);
+
+        buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
     }
 
-    @Override
-    public void close() {
+    private void enforceConstraints() {
+        final long streamTime = internalProcessorContext.streamTime();
+        final long expiryTime = streamTime - suppressDurationMillis;
+
+        buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
+
+        if (overCapacity()) {
+            switch (bufferFullStrategy) {
+                case EMIT:
+                    buffer.evictWhile(this::overCapacity, this::emit);
+                    return;
+                case SHUT_DOWN:
+                    throw new StreamsException(String.format(
+                        "%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.",
+                        internalProcessorContext.currentNode().name(),
+                        buffer.numRecords(), maxRecords,
+                        buffer.bufferSize(), maxBytes
+                    ));
+            }
+        }
     }
 
-    @Override
-    public String toString() {
-        return "KTableSuppressProcessor{" +
-            "suppress=" + suppress +
-            ", keySerde=" + keySerde +
-            ", valueSerde=" + valueSerde +
-            '}';
+    private boolean overCapacity() {
+        return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes;
     }
 
-    public static class NotImplementedException extends RuntimeException {
-        NotImplementedException() {
-            super();
+    private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
+        final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value());
+        if (shouldForward(value)) {
+            final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext();
+            internalProcessorContext.setRecordContext(toEmit.value.recordContext());
+            try {
+                final K key = keySerde.deserializer().deserialize(null, toEmit.key.get());
+                internalProcessorContext.forward(key, value);
+            } finally {
+                internalProcessorContext.setRecordContext(prevRecordContext);
+            }
         }
     }
+
+    private boolean shouldForward(final Change<V> value) {
+        return !(value.newValue == null && shouldSuppressTombstones);
+    }
+
+    @Override
+    public void close() {
+    }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
index 0634a748a5b..ef754ec6fc2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java
@@ -22,22 +22,22 @@
 
 import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
 
-public class StrictBufferConfigImpl extends BufferConfigImpl<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig {
+public class StrictBufferConfigImpl extends BufferConfigInternal<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig {
 
-    private final long maxKeys;
+    private final long maxRecords;
     private final long maxBytes;
     private final BufferFullStrategy bufferFullStrategy;
 
-    public StrictBufferConfigImpl(final long maxKeys,
+    public StrictBufferConfigImpl(final long maxRecords,
                                   final long maxBytes,
                                   final BufferFullStrategy bufferFullStrategy) {
-        this.maxKeys = maxKeys;
+        this.maxRecords = maxRecords;
         this.maxBytes = maxBytes;
         this.bufferFullStrategy = bufferFullStrategy;
     }
 
     public StrictBufferConfigImpl() {
-        this.maxKeys = Long.MAX_VALUE;
+        this.maxRecords = Long.MAX_VALUE;
         this.maxBytes = Long.MAX_VALUE;
         this.bufferFullStrategy = SHUT_DOWN;
     }
@@ -49,12 +49,12 @@ public StrictBufferConfigImpl() {
 
     @Override
     public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
-        return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy);
+        return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy);
     }
 
     @Override
-    public long maxKeys() {
-        return maxKeys;
+    public long maxRecords() {
+        return maxRecords;
     }
 
     @Override
@@ -72,19 +72,19 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
-        return maxKeys == that.maxKeys &&
+        return maxRecords == that.maxRecords &&
             maxBytes == that.maxBytes &&
             bufferFullStrategy == that.bufferFullStrategy;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(maxKeys, maxBytes, bufferFullStrategy);
+        return Objects.hash(maxRecords, maxBytes, bufferFullStrategy);
     }
 
     @Override
     public String toString() {
-        return "StrictBufferConfigImpl{maxKeys=" + maxKeys +
+        return "StrictBufferConfigImpl{maxKeys=" + maxRecords +
             ", maxBytes=" + maxBytes +
             ", bufferFullStrategy=" + bufferFullStrategy + '}';
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
similarity index 71%
rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
rename to streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index a3bf2db63a2..99245dae544 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -17,32 +17,32 @@
 package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.streams.kstream.Suppressed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions.TimeDefinition;
 
 import java.time.Duration;
 import java.util.Objects;
 
-public class SuppressedImpl<K> implements Suppressed<K> {
+public class SuppressedInternal<K> implements Suppressed<K> {
     private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE);
-    private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded();
+    private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = (StrictBufferConfigImpl) BufferConfig.unbounded();
 
-    private final BufferConfig bufferConfig;
+    private final BufferConfigInternal bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
     private final boolean suppressTombstones;
 
-    public SuppressedImpl(final Duration suppressionTime,
-                          final BufferConfig bufferConfig,
-                          final TimeDefinition<K> timeDefinition,
-                          final boolean suppressTombstones) {
+    public SuppressedInternal(final Duration suppressionTime,
+                              final BufferConfig bufferConfig,
+                              final TimeDefinition<K> timeDefinition,
+                              final boolean suppressTombstones) {
         this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
-        this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition;
-        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig;
+        this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition;
+        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig;
         this.suppressTombstones = suppressTombstones;
     }
 
-    interface TimeDefinition<K> {
-        long time(final ProcessorContext context, final K key);
+    BufferConfigInternal getBufferConfig() {
+        return bufferConfig;
     }
 
     TimeDefinition<K> getTimeDefinition() {
@@ -53,11 +53,15 @@ Duration getTimeToWaitForMoreEvents() {
         return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents;
     }
 
+    boolean shouldSuppressTombstones() {
+        return suppressTombstones;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        final SuppressedImpl<?> that = (SuppressedImpl<?>) o;
+        final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
         return Objects.equals(bufferConfig, that.bufferConfig) &&
             Objects.equals(getTimeToWaitForMoreEvents(), that.getTimeToWaitForMoreEvents()) &&
             Objects.equals(getTimeDefinition(), that.getTimeDefinition());
@@ -70,14 +74,10 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        return "SuppressedImpl{" +
+        return "SuppressedInternal{" +
             ", bufferConfig=" + bufferConfig +
             ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
             ", timeDefinition=" + timeDefinition +
             '}';
     }
-
-    boolean suppressTombstones() {
-        return suppressTombstones;
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
new file mode 100644
index 00000000000..b37bcf663fd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+final class TimeDefinitions {
+    private TimeDefinitions() {}
+
+    enum TimeDefinitionType {
+        RECORD_TIME, WINDOW_END_TIME;
+    }
+
+    /**
+     * This interface should never be instantiated outside of this class.
+     */
+    interface TimeDefinition<K> {
+        long time(final ProcessorContext context, final K key);
+
+        TimeDefinitionType type();
+    }
+
+    public static class RecordTimeDefintion<K> implements TimeDefinition<K> {
+        private static final RecordTimeDefintion INSTANCE = new RecordTimeDefintion();
+
+        private RecordTimeDefintion() {}
+
+        @SuppressWarnings("unchecked")
+        public static <K> RecordTimeDefintion<K> instance() {
+            return RecordTimeDefintion.INSTANCE;
+        }
+
+        @Override
+        public long time(final ProcessorContext context, final K key) {
+            return context.timestamp();
+        }
+
+        @Override
+        public TimeDefinitionType type() {
+            return TimeDefinitionType.RECORD_TIME;
+        }
+    }
+
+    public static class WindowEndTimeDefinition<K extends Windowed> implements TimeDefinition<K> {
+        private static final WindowEndTimeDefinition INSTANCE = new WindowEndTimeDefinition();
+
+        private WindowEndTimeDefinition() {}
+
+        @SuppressWarnings("unchecked")
+        public static <K extends Windowed> WindowEndTimeDefinition<K> instance() {
+            return WindowEndTimeDefinition.INSTANCE;
+        }
+
+        @Override
+        public long time(final ProcessorContext context, final K key) {
+            return key.window().end();
+        }
+
+        @Override
+        public TimeDefinitionType type() {
+            return TimeDefinitionType.WINDOW_END_TIME;
+        }
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
new file mode 100644
index 00000000000..d3ad350686a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Objects;
+
+class TimeKey implements Comparable<TimeKey> {
+    private final long time;
+    private final Bytes key;
+
+    TimeKey(final long time, final Bytes key) {
+        this.time = time;
+        this.key = key;
+    }
+
+    Bytes key() {
+        return key;
+    }
+
+    long time() {
+        return time;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final TimeKey timeKey = (TimeKey) o;
+        return time == timeKey.time &&
+            Objects.equals(key, timeKey.key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(time, key);
+    }
+
+    @Override
+    public int compareTo(final TimeKey o) {
+        // ordering of keys within a time uses hashCode.
+        final int timeComparison = Long.compare(time, o.time);
+        return timeComparison == 0 ? key.compareTo(o.key) : timeComparison;
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
new file mode 100644
index 00000000000..98a4f63c83f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.suppress;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.internals.ContextualRecord;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+interface TimeOrderedKeyValueBuffer {
+    void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback);
+
+    void put(final long time, final Bytes key, final ContextualRecord value);
+
+    int numRecords();
+
+    long bufferSize();
+
+    long minTimestamp();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index dd572649765..cd4657bdcd8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.RecordContext;
 
@@ -78,6 +79,23 @@ public Headers headers() {
         return headers;
     }
 
+    public long sizeBytes() {
+        long size = 0L;
+        size += 8; // value.context.timestamp
+        size += 8; // value.context.offset
+        if (topic != null) {
+            size += topic.toCharArray().length;
+        }
+        size += 4; // partition
+        if (headers != null) {
+            for (final Header header : headers) {
+                size += header.key().toCharArray().length;
+                size += header.value().length;
+            }
+        }
+        return size;
+    }
+
     @Override
     public boolean equals(final Object o) {
         if (this == o) return true;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index c016f640cb2..a6a24ea098f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -89,7 +89,7 @@ public void apply(final List<ThreadCache.DirtyEntry> entries) {
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
         final ProcessorRecordContext current = context.recordContext();
         try {
-            context.setRecordContext(entry.recordContext());
+            context.setRecordContext(entry.entry().context());
             if (flushListener != null) {
                 V oldValue = null;
                 if (sendOldValues) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index 2da5ab98550..cbcb7490efb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -169,7 +169,7 @@ public void put(final Windowed<Bytes> key, final byte[] value) {
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
         final ProcessorRecordContext current = context.recordContext();
-        context.setRecordContext(entry.recordContext());
+        context.setRecordContext(entry.entry().context());
         try {
             final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic);
             final Bytes rawKey = Bytes.wrap(serdes.rawKey(key.key()));
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 688e88962a2..f8d9ad590a2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -108,7 +108,7 @@ private void maybeForward(final ThreadCache.DirtyEntry entry,
                               final InternalProcessorContext context) {
         if (flushListener != null) {
             final ProcessorRecordContext current = context.recordContext();
-            context.setRecordContext(entry.recordContext());
+            context.setRecordContext(entry.entry().context());
             try {
                 final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null;
                 flushListener.apply(windowedKey, serdes.valueFrom(entry.newValue()), oldValue);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
new file mode 100644
index 00000000000..89935c09d8f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ContextualRecord.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class ContextualRecord {
+    private final byte[] value;
+    private final ProcessorRecordContext recordContext;
+
+    public ContextualRecord(final byte[] value, final ProcessorRecordContext recordContext) {
+        this.value = value;
+        this.recordContext = recordContext;
+    }
+
+    public ProcessorRecordContext recordContext() {
+        return recordContext;
+    }
+
+    public byte[] value() {
+        return value;
+    }
+
+    public long sizeBytes() {
+        return (value == null ? 0 : value.length) + recordContext.sizeBytes();
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final ContextualRecord that = (ContextualRecord) o;
+        return Arrays.equals(value, that.value) &&
+            Objects.equals(recordContext, that.recordContext);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, recordContext);
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 0ac0b77dd37..53436358ded 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -19,18 +19,17 @@
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 
-import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A cache entry
  */
-class LRUCacheEntry extends ProcessorRecordContext {
-
-    private final byte[] value;
+class LRUCacheEntry {
+    private final ContextualRecord record;
     private final long sizeBytes;
     private boolean isDirty;
 
+
     LRUCacheEntry(final byte[] value) {
         this(value, null, false, -1, -1, -1, "");
     }
@@ -42,15 +41,16 @@
                   final long timestamp,
                   final int partition,
                   final String topic) {
-        super(timestamp, offset, partition, topic, headers);
-        this.value = value;
+        final ProcessorRecordContext context = new ProcessorRecordContext(timestamp, offset, partition, topic, headers);
+
+        this.record = new ContextualRecord(
+            value,
+            context
+        );
+
         this.isDirty = isDirty;
-        this.sizeBytes = (value == null ? 0 : value.length) +
-                1 + // isDirty
-                8 + // timestamp
-                8 + // offset
-                4 + // partition
-                (topic == null ? 0 : topic.length());
+        this.sizeBytes = 1 + // isDirty
+            record.sizeBytes();
     }
 
     void markClean() {
@@ -66,7 +66,11 @@ long size() {
     }
 
     byte[] value() {
-        return value;
+        return record.value();
+    }
+
+    public ProcessorRecordContext context() {
+        return record.recordContext();
     }
 
     @Override
@@ -74,17 +78,13 @@ public boolean equals(final Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         final LRUCacheEntry that = (LRUCacheEntry) o;
-        return timestamp() == that.timestamp() &&
-                offset() == that.offset() &&
-                partition() == that.partition() &&
-                Objects.equals(topic(), that.topic()) &&
-                Objects.equals(headers(), that.headers()) &&
-                Arrays.equals(this.value, that.value()) &&
-                this.isDirty == that.isDirty();
+        return sizeBytes == that.sizeBytes &&
+            isDirty() == that.isDirty() &&
+            Objects.equals(record, that.record);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(timestamp(), offset(), topic(), partition(), headers(), value, isDirty);
+        return Objects.hash(record, sizeBytes, isDirty());
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 27270e6b51d..941b5221524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
@@ -332,9 +331,9 @@ public void close() {
     static class DirtyEntry {
         private final Bytes key;
         private final byte[] newValue;
-        private final ProcessorRecordContext recordContext;
+        private final LRUCacheEntry recordContext;
 
-        DirtyEntry(final Bytes key, final byte[] newValue, final ProcessorRecordContext recordContext) {
+        DirtyEntry(final Bytes key, final byte[] newValue, final LRUCacheEntry recordContext) {
             this.key = key;
             this.newValue = newValue;
             this.recordContext = recordContext;
@@ -348,7 +347,7 @@ public Bytes key() {
             return newValue;
         }
 
-        public ProcessorRecordContext recordContext() {
+        public LRUCacheEntry entry() {
             return recordContext;
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index af91abaf2b1..a9920e3a6f1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -49,7 +49,6 @@
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -75,6 +74,9 @@
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 @Category({IntegrationTest.class})
 public class SuppressionIntegrationTest {
@@ -88,7 +90,6 @@
     private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
     private static final long TIMEOUT_MS = 30_000L;
 
-    @Ignore
     @Test
     public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedException {
         final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
@@ -220,10 +221,9 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
         }
     }
 
-    @Ignore
     @Test
     public void shouldSuppressIntermediateEventsWithRecordLimit() throws InterruptedException {
-        final String testId = "-shouldSuppressIntermediateEventsWithKeyLimit";
+        final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit";
         final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
@@ -279,7 +279,46 @@ public void shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted
         }
     }
 
-    @Ignore
+    @Test
+    public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
+        final String testId = "-shouldShutdownWhenRecordConstraintIsViolated";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).shutDownWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyErrorShutdown(driver);
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
     @Test
     public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedException {
         final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit";
@@ -339,7 +378,47 @@ public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE
         }
     }
 
-    @Ignore
+    @Test
+    public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
+        final String testId = "-shouldShutdownWhenBytesConstraintIsViolated";
+        final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, builder);
+
+        valueCounts
+            // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size.
+            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyErrorShutdown(driver);
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
     @Test
     public void shouldSupportFinalResultsForTimeWindows() throws InterruptedException {
         final String testId = "-shouldSupportFinalResultsForTimeWindows";
@@ -479,6 +558,11 @@ private void produceSynchronously(final String topic, final List<KeyValueTimesta
         }
     }
 
+    private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException {
+        waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down.");
+        assertThat(driver.state(), is(KafkaStreams.State.ERROR));
+    }
+
     private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) {
         final List<ConsumerRecord<String, Long>> results;
         try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 7650c59759e..fcb5ba8ef3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
 import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
 import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
-import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.junit.Test;
 
 import static java.lang.Long.MAX_VALUE;
@@ -61,31 +61,31 @@ public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
         assertThat(
             "time alone should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and unbounded buffer should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and keys buffer should be set",
             untilTimeLimit(ofMillis(2), maxRecords(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), maxRecords(2), null, false))
         );
 
         assertThat(
             "time and size buffer should be set",
             untilTimeLimit(ofMillis(2), maxBytes(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), maxBytes(2), null, false))
         );
 
         assertThat(
             "all constraints should be set",
             untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
-            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false))
+            is(new SuppressedInternal<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false))
         );
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index d98a15e093b..222e1d63982 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -32,7 +31,6 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -41,7 +39,6 @@
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
@@ -60,6 +57,7 @@
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
@@ -159,7 +157,7 @@ public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithTimeLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -198,11 +196,9 @@ public void shouldSuppressIntermediateEventsWithTimeLimit() {
                     new KeyValueTimestamp<>("v1", 1L, 2L)
                 )
             );
-            // note that the current stream time is 2, which causes v1 to age out of the buffer, since
-            // it has been buffered since time 0 (even though the current version of it in the buffer has timestamp 1)
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(new KeyValueTimestamp<>("v1", 0L, 1L))
+                singletonList(new KeyValueTimestamp<>("v1", 1L, 2L))
             );
             // inserting a dummy "tick" record just to advance stream time
             driver.pipeInput(recordFactory.create("input", "tick", "tick", 3L));
@@ -225,36 +221,15 @@ public void shouldSuppressIntermediateEventsWithTimeLimit() {
                     new KeyValueTimestamp<>("tick", 1L, 4L)
                 )
             );
+            // tick is still buffered, since it was first inserted at time 3, and it is only time 4 right now.
             verify(
                 drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(
-                    new KeyValueTimestamp<>("v1", 1L, 2L)
-                )
-            );
-            driver.pipeInput(recordFactory.create("input", "tick", "tick", 5L));
-            verify(
-                drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                asList(
-                    new KeyValueTimestamp<>("tick", 0L, 5L),
-                    new KeyValueTimestamp<>("tick", 1L, 5L)
-                )
-            );
-            // Note that because the punctuate runs before the process call, the tick at time 5 causes
-            // the previous tick to age out of the buffer, so at this point, we see the prior value emitted
-            // and the new value is still buffered.
-
-            // Also worth noting is that "tick" ages out because it has been buffered since time 3, even though
-            // the current timestamp of the buffered record is 4.
-            verify(
-                drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
-                singletonList(
-                    new KeyValueTimestamp<>("tick", 1L, 4L)
-                )
+                emptyList()
             );
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithRecordLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -320,7 +295,7 @@ public void shouldSuppressIntermediateEventsWithRecordLimit() {
         }
     }
 
-    @Test(expected = ProcessorStateException.class)
+    @Test
     public void shouldSuppressIntermediateEventsWithBytesLimit() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> valueCounts = builder
@@ -351,8 +326,7 @@ public void shouldSuppressIntermediateEventsWithBytesLimit() {
         try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
             driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
             driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
-            final ConsumerRecord<byte[], byte[]> consumerRecord = recordFactory.create("input", "k2", "v1", 2L);
-            driver.pipeInput(consumerRecord);
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
             verify(
                 drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
                 asList(
@@ -388,7 +362,7 @@ public void shouldSuppressIntermediateEventsWithBytesLimit() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForTimeWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
@@ -442,7 +416,7 @@ public void shouldSupportFinalResultsForTimeWindows() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
@@ -501,7 +475,7 @@ public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
         }
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void shouldSupportFinalResultsForSessionWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index a38d1d58f43..bb7f49ce7a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,15 +16,20 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Suppressed;
+import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
+import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -38,25 +43,23 @@
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.common.serialization.Serdes.Long;
 import static org.apache.kafka.common.serialization.Serdes.String;
+import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 import static org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom;
-import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("PointlessArithmeticExpression")
 public class KTableSuppressProcessorTest {
     private static final long ARBITRARY_LONG = 5L;
 
-    private static final long ARBITRARY_TIMESTAMP = 1993L;
-
     private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
 
-    private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L);
-
     @Test
     public void zeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<String, Long> processor =
@@ -66,7 +69,7 @@ public void zeroTimeLimitShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = ARBITRARY_LONG;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
@@ -83,7 +86,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor =
             new KTableSuppressProcessor<>(
                 getImpl(untilTimeLimit(ZERO, unbounded())),
-                timeWindowedSerdeFrom(String.class),
+                timeWindowedSerdeFrom(String.class, 100L),
                 new FullChangeSerde<>(Long())
             );
 
@@ -91,9 +94,9 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = ARBITRARY_LONG;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
-        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
 
@@ -103,7 +106,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void intermediateSuppressionShouldBufferAndEmitLater() {
         final KTableSuppressProcessor<String, Long> processor =
             new KTableSuppressProcessor<>(
@@ -117,13 +120,15 @@ public void intermediateSuppressionShouldBufferAndEmitLater() {
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
+        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        context.scheduledPunctuators().get(0).getPunctuator().punctuate(1);
+        context.setRecordMetadata("topic", 0, 1, null, 1L);
+        context.setStreamTime(1L);
+        processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -131,38 +136,49 @@ public void intermediateSuppressionShouldBufferAndEmitLater() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-
-    @SuppressWarnings("unchecked")
-    private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration grace) {
-        return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
-    }
-
-
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
             finalResults(ofMillis(1L)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 1L),
             new FullChangeSerde<>(Long())
         );
 
         final MockInternalProcessorContext context = new MockInternalProcessorContext();
         processor.init(context);
 
-        final long timestamp = ARBITRARY_TIMESTAMP;
-        context.setRecordMetadata("topic", 0, 0, null, timestamp);
-        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final long windowStart = 99L;
+        final long recordTime = 99L;
+        final long windowEnd = 100L;
+        context.setRecordMetadata("topic", 0, 0, null, recordTime);
+        context.setStreamTime(recordTime);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        context.scheduledPunctuators().get(0).getPunctuator().punctuate(timestamp + 1L);
+        // although the stream time is now 100, we have to wait 1 ms after the window *end* before we
+        // emit "hey", so we don't emit yet.
+        final long windowStart2 = 100L;
+        final long recordTime2 = 100L;
+        final long windowEnd2 = 101L;
+        context.setRecordMetadata("topic", 0, 1, null, recordTime2);
+        context.setStreamTime(recordTime2);
+        processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
+        assertThat(context.forwarded(), hasSize(0));
+
+        // ok, now it's time to emit "hey"
+        final long windowStart3 = 101L;
+        final long recordTime3 = 101L;
+        final long windowEnd3 = 102L;
+        context.setRecordMetadata("topic", 0, 1, null, recordTime3);
+        context.setStreamTime(recordTime3);
+        processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
         assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
-        assertThat(capturedForward.timestamp(), is(timestamp));
+        assertThat(capturedForward.timestamp(), is(recordTime));
     }
 
     /**
@@ -170,27 +186,32 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
      * it will still buffer events and emit only after the end of the window.
      * As opposed to emitting immediately the way regular suppresion would with a time limit of 0.
      */
-    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    @Test
     public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
         final MockInternalProcessorContext context = new MockInternalProcessorContext();
         processor.init(context);
 
+        // note the record is in the past, but the window end is in the future, so we still have to buffer,
+        // even though the grace period is 0.
         final long timestamp = 5L;
-        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final long streamTime = 99L;
         final long windowEnd = 100L;
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setStreamTime(streamTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
-        assertThat(context.scheduledPunctuators(), hasSize(1));
-        context.scheduledPunctuators().get(0).getPunctuator().punctuate(windowEnd);
+        context.setRecordMetadata("", 0, 1L, null, windowEnd);
+        context.setStreamTime(windowEnd);
+        processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
@@ -202,7 +223,7 @@ public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
     public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -210,7 +231,7 @@ public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
@@ -226,7 +247,7 @@ public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
     public void finalResultsShouldSuppressTombstonesForTimeWindows() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
             finalResults(ofMillis(0)),
-            timeWindowedSerdeFrom(String.class),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -234,7 +255,7 @@ public void finalResultsShouldSuppressTombstonesForTimeWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -255,7 +276,7 @@ public void finalResultsShouldSuppressTombstonesForSessionWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -264,12 +285,11 @@ public void finalResultsShouldSuppressTombstonesForSessionWindows() {
         assertThat(context.forwarded(), hasSize(0));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForTimeWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<Windowed<String>, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
-            timeWindowedSerdeFrom(String.class),
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
+            timeWindowedSerdeFrom(String.class, 100L),
             new FullChangeSerde<>(Long())
         );
 
@@ -277,7 +297,7 @@ public void suppressShouldNotSuppressTombstonesForTimeWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -289,11 +309,10 @@ public void suppressShouldNotSuppressTombstonesForTimeWindows() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForSessionWindows() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<Windowed<String>, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
             sessionWindowedSerdeFrom(String.class),
             new FullChangeSerde<>(Long())
         );
@@ -302,7 +321,7 @@ public void suppressShouldNotSuppressTombstonesForSessionWindows() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -314,11 +333,61 @@ public void suppressShouldNotSuppressTombstonesForSessionWindows() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void suppressShouldNotSuppressTombstonesForKTable() {
-        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<String, Long>(
-            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setStreamTime(timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void suppressShouldEmitWhenOverRecordCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        processor.process("dummyKey", value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void suppressShouldEmitWhenOverByteCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))),
             Serdes.String(),
             new FullChangeSerde<>(Long())
         );
@@ -327,18 +396,82 @@ public void suppressShouldNotSuppressTombstonesForKTable() {
         processor.init(context);
 
         final long timestamp = 100L;
-        context.setTimestamp(timestamp);
         context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
 
+        context.setRecordMetadata("", 0, 1L, null, timestamp + 1);
+        processor.process("dummyKey", value);
+
         assertThat(context.forwarded(), hasSize(1));
         final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
         assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+    @Test
+    public void suppressShouldShutDownWhenOverRecordCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull())),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setCurrentNode(new ProcessorNode("testNode"));
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        try {
+            processor.process("dummyKey", value);
+            fail("expected an exception");
+        } catch (final StreamsException e) {
+            assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
+        }
+    }
+
+    @Test
+    public void suppressShouldShutDownWhenOverByteCapacity() {
+        final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>(
+            getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull())),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setStreamTime(timestamp);
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        context.setCurrentNode(new ProcessorNode("testNode"));
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        context.setRecordMetadata("", 0, 1L, null, timestamp);
+        try {
+            processor.process("dummyKey", value);
+            fail("expected an exception");
+        } catch (final StreamsException e) {
+            assertThat(e.getMessage(), containsString("buffer exceeded its max capacity"));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) {
+        return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
+    }
+
     private static <E> Matcher<Collection<E>> hasSize(final int i) {
         return new BaseMatcher<Collection<E>>() {
             @Override
@@ -359,7 +492,15 @@ public boolean matches(final Object item) {
         };
     }
 
-    private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> suppressed) {
-        return (SuppressedImpl<K>) suppressed;
+    private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) {
+        return (SuppressedInternal<K>) suppressed;
+    }
+
+    private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) {
+        final Serde<K> kSerde = Serdes.serdeFrom(rawType);
+        return new Serdes.WrapperSerde<>(
+            new TimeWindowedSerializer<>(kSerde.serializer()),
+            new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize)
+        );
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 0fdbdf76b3c..71a6ac21c10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -190,7 +190,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
 
         assertEquals(2, flushed.size());
         assertEquals(Bytes.wrap(new byte[] {0}), flushed.get(0).key());
-        assertEquals(headers, flushed.get(0).recordContext().headers());
+        assertEquals(headers, flushed.get(0).entry().context().headers());
         assertArrayEquals(new byte[] {10}, flushed.get(0).newValue());
         assertEquals(Bytes.wrap(new byte[] {2}), flushed.get(1).key());
         assertArrayEquals(new byte[] {30}, flushed.get(1).newValue());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)