You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/05 19:36:22 UTC

[kafka] branch 2.1 updated: KAFKA-7895: fix stream-time reckoning for Suppress (2.1) (#6286) (#6325)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0953cd8  KAFKA-7895: fix stream-time reckoning for Suppress (2.1) (#6286) (#6325)
0953cd8 is described below

commit 0953cd8c056e8eb0dd329a4b2d6ebee0d0e8362c
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Tue Mar 5 13:36:07 2019 -0600

    KAFKA-7895: fix stream-time reckoning for Suppress (2.1) (#6286) (#6325)
    
    Even within a Task, different Processors have different perceptions
    of time, due to record caching on stores and in suppression itself,
    and in general, due to any processor logic that may hold onto
    records arbitrarily and emit them later. Thanks to this, we can't rely
    on the whole task existing in the same "instant" of stream-time. The
    solution is for each processor node that cares about stream-time to
    track it independently.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
 .../internals/KStreamSessionWindowAggregate.java   |   5 +-
 .../kstream/internals/KStreamWindowAggregate.java  |   5 +-
 .../suppress/KTableSuppressProcessor.java          |  13 ++-
 .../internals/suppress/SuppressedInternal.java     |  39 +++++--
 .../internals/GlobalProcessorContextImpl.java      |   7 +-
 .../internals/InternalProcessorContext.java        |   2 -
 .../processor/internals/ProcessorContextImpl.java  |  10 --
 .../streams/processor/internals/RecordQueue.java   |   2 +-
 .../processor/internals/StandbyContextImpl.java    |  12 ---
 .../streams/processor/internals/StandbyTask.java   |   8 +-
 .../streams/processor/internals/StreamTask.java    |   1 -
 .../processor/internals/TimestampSupplier.java     |  21 ----
 .../internals/RocksDBSegmentedBytesStore.java      |  24 +++--
 .../kafka/streams/state/internals/Segments.java    |   8 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   6 +-
 .../suppress/KTableSuppressProcessorTest.java      |  19 ----
 .../internals/AbstractProcessorContextTest.java    |   5 -
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  20 +++-
 .../state/internals/RocksDBWindowStoreTest.java    |   5 +-
 .../streams/state/internals/SegmentsTest.java      |  72 ++++++-------
 .../kafka/streams/tests/SmokeTestClient.java       | 120 +++++++++------------
 .../kafka/streams/tests/SmokeTestDriver.java       |  58 +++++++++-
 .../kafka/test/InternalMockProcessorContext.java   |  14 +--
 .../kafka/test/MockInternalProcessorContext.java   |  10 --
 .../apache/kafka/test/NoOpProcessorContext.java    |   5 -
 25 files changed, 244 insertions(+), 247 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index b89399b..f7802d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -82,6 +83,7 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -108,7 +110,8 @@ public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProce
                 return;
             }
 
-            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
+            observedStreamTime = Math.max(observedStreamTime, context().timestamp());
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
 
             final long timestamp = context().timestamp();
             final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f292515..c5b2483 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -75,6 +76,7 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
         private StreamsMetricsImpl metrics;
         private InternalProcessorContext internalProcessorContext;
         private Sensor lateRecordDropSensor;
+        private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
         @SuppressWarnings("unchecked")
         @Override
@@ -103,7 +105,8 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
 
             // first get the matching windows
             final long timestamp = context().timestamp();
-            final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+            final long closeTime = observedStreamTime - windows.gracePeriodMs();
 
             final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 50e74a3..4058083 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -40,14 +41,17 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private final long suppressDurationMillis;
     private final TimeDefinition<K> bufferTimeDefinition;
     private final BufferFullStrategy bufferFullStrategy;
-    private final boolean shouldSuppressTombstones;
+    private final boolean safeToDropTombstones;
     private final String storeName;
+
     private TimeOrderedKeyValueBuffer buffer;
     private InternalProcessorContext internalProcessorContext;
 
     private Serde<K> keySerde;
     private FullChangeSerde<V> valueSerde;
 
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+
     public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
                                    final String storeName,
                                    final Serde<K> keySerde,
@@ -61,7 +65,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
         suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
         bufferTimeDefinition = suppress.timeDefinition();
         bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
-        shouldSuppressTombstones = suppress.shouldSuppressTombstones();
+        safeToDropTombstones = suppress.safeToDropTombstones();
     }
 
     @SuppressWarnings("unchecked")
@@ -75,6 +79,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
 
     @Override
     public void process(final K key, final Change<V> value) {
+        observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
         buffer(key, value);
         enforceConstraints();
     }
@@ -90,7 +95,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     }
 
     private void enforceConstraints() {
-        final long streamTime = internalProcessorContext.streamTime();
+        final long streamTime = observedStreamTime;
         final long expiryTime = streamTime - suppressDurationMillis;
 
         buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
@@ -130,7 +135,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     }
 
     private boolean shouldForward(final Change<V> value) {
-        return !(value.newValue == null && shouldSuppressTombstones);
+        return value.newValue != null || !safeToDropTombstones;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 7453475..c387700 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -30,23 +30,36 @@ public class SuppressedInternal<K> implements Suppressed<K> {
     private final BufferConfigInternal bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
-    private final boolean suppressTombstones;
+    private final boolean safeToDropTombstones;
 
+    /**
+     * @param safeToDropTombstones Note: it's *only* safe to drop tombstones for windowed KTables in "final results" mode.
+     *                             In that case, we have a priori knowledge that we have never before emitted any
+     *                             results for a given key, and therefore the tombstone is unnecessary (albeit
+     *                             idempotent and correct). We decided that the unnecessary tombstones would not be
+     *                             desirable in the output stream, though, hence the ability to drop them.
+     *
+     *                             A alternative is to remember whether a result has previously been emitted
+     *                             for a key and drop tombstones in that case, but it would be a little complicated to
+     *                             figure out when to forget the fact that we have emitted some result (currently, the
+     *                             buffer immediately forgets all about a key when we emit, which helps to keep it
+     *                             compact).
+     */
     public SuppressedInternal(final String name,
                               final Duration suppressionTime,
                               final BufferConfig bufferConfig,
                               final TimeDefinition<K> timeDefinition,
-                              final boolean suppressTombstones) {
+                              final boolean safeToDropTombstones) {
         this.name = name;
         this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
         this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition;
         this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig;
-        this.suppressTombstones = suppressTombstones;
+        this.safeToDropTombstones = safeToDropTombstones;
     }
 
     @Override
     public Suppressed<K> withName(final String name) {
-        return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, suppressTombstones);
+        return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones);
     }
 
     public String name() {
@@ -65,16 +78,20 @@ public class SuppressedInternal<K> implements Suppressed<K> {
         return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents;
     }
 
-    boolean shouldSuppressTombstones() {
-        return suppressTombstones;
+    boolean safeToDropTombstones() {
+        return safeToDropTombstones;
     }
 
     @Override
     public boolean equals(final Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
         final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
-        return suppressTombstones == that.suppressTombstones &&
+        return safeToDropTombstones == that.safeToDropTombstones &&
             Objects.equals(name, that.name) &&
             Objects.equals(bufferConfig, that.bufferConfig) &&
             Objects.equals(timeToWaitForMoreEvents, that.timeToWaitForMoreEvents) &&
@@ -83,7 +100,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, suppressTombstones);
+        return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, safeToDropTombstones);
     }
 
     @Override
@@ -92,7 +109,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
             ", bufferConfig=" + bufferConfig +
             ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
             ", timeDefinition=" + timeDefinition +
-            ", suppressTombstones=" + suppressTombstones +
+            ", safeToDropTombstones=" + safeToDropTombstones +
             '}';
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 5c5b84f..a39fafd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -105,9 +105,4 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
     public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
-
-    @Override
-    public long streamTime() {
-        throw new RuntimeException("Stream time is not implemented for the global processor context.");
-    }
-}
+}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 98511fd..0f67dff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -63,6 +63,4 @@ public interface InternalProcessorContext extends ProcessorContext {
      * Mark this context as being uninitialized
      */
     void uninitialize();
-
-    long streamTime();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 21e1c17..0ed82de 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -35,7 +35,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
 
     private final StreamTask task;
     private final RecordCollector collector;
-    private TimestampSupplier streamTimeSupplier;
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
@@ -165,13 +164,4 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return schedule(interval.toMillis(), type, callback);
     }
 
-    void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
-        this.streamTimeSupplier = streamTimeSupplier;
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTimeSupplier.get();
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index d06d7f3..572e629 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -36,7 +36,7 @@ import java.util.ArrayDeque;
  */
 public class RecordQueue {
 
-    static final long UNKNOWN = -1L;
+    static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;
 
     private final Logger log;
     private final SourceNode source;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 6b835d9..ee69373 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -75,8 +75,6 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         }
     };
 
-    private long streamTime = RecordQueue.UNKNOWN;
-
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
                        final ProcessorStateManager stateMgr,
@@ -231,14 +229,4 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     public ProcessorNode currentNode() {
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
-
-    void updateStreamTime(final long streamTime) {
-        this.streamTime = Math.max(this.streamTime, streamTime);
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 45f06b2..bd8ba34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -37,7 +37,6 @@ import java.util.Map;
 public class StandbyTask extends AbstractTask {
 
     private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
-    private final StandbyContextImpl standbyContext;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
@@ -60,7 +59,7 @@ public class StandbyTask extends AbstractTask {
                 final StateDirectory stateDirectory) {
         super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
-        processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics);
+        processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
     }
 
     @Override
@@ -120,7 +119,7 @@ public class StandbyTask extends AbstractTask {
 
     private void flushAndCheckpointState() {
         stateMgr.flush();
-        stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
+        stateMgr.checkpoint(Collections.emptyMap());
     }
 
     /**
@@ -177,9 +176,6 @@ public class StandbyTask extends AbstractTask {
             if (record.offset() < limit) {
                 restoreRecords.add(record);
                 lastOffset = record.offset();
-                // ideally, we'd use the stream time at the time of the change logging, but we'll settle for
-                // record timestamp for now.
-                standbyContext.updateStreamTime(record.timestamp());
             } else {
                 remainingRecords.add(record);
             }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 247a156..c4fecef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -236,7 +236,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         recordInfo = new PartitionGroup.RecordInfo();
         partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl));
-        processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
deleted file mode 100644
index a6a7a42..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-public interface TimestampSupplier {
-    long get();
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 17079b9..c4fce72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
@@ -51,6 +52,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private volatile boolean open;
     private Set<Segment> bulkLoadSegments;
     private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
 
     RocksDBSegmentedBytesStore(final String name,
                                final String metricScope,
@@ -108,7 +110,9 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     @Override
     public void remove(final Bytes key) {
-        final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final long timestamp = keySchema.segmentTimestamp(key);
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+        final Segment segment = segments.getSegmentForTimestamp(timestamp);
         if (segment == null) {
             return;
         }
@@ -118,8 +122,9 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     @Override
     public void put(final Bytes key, final byte[] value) {
         final long timestamp = keySchema.segmentTimestamp(key);
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
         final long segmentId = segments.segmentId(timestamp);
-        final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+        final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
         if (segment == null) {
             expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
@@ -163,7 +168,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
             "expired-window-record-drop"
         );
 
-        segments.openExisting(this.context);
+        segments.openExisting(this.context, observedStreamTime);
 
         bulkLoadSegments = new HashSet<>(segments.allSegments());
 
@@ -215,10 +220,17 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     // Visible for testing
     Map<Segment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final KeyValue<byte[], byte[]> record : records) {
+            final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key));
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+        }
+
         final Map<Segment, WriteBatch> writeBatchMap = new HashMap<>();
         for (final KeyValue<byte[], byte[]> record : records) {
-            final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
-            final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+            final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key));
+            final long segmentId = segments.segmentId(timestamp);
+            final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
             if (segment != null) {
                 // This handles the case that state store is moved to a new client and does not
                 // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading
@@ -247,7 +259,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     }
 
     private void toggleForBulkLoading(final boolean prepareForBulkload) {
-        for (final Segment segment: segments.allSegments()) {
+        for (final Segment segment : segments.allSegments()) {
             segment.toggleDbForBulkLoading(prepareForBulkload);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index cf858b0..96bbfd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -71,8 +71,8 @@ class Segments {
         return segments.get(segmentId(timestamp));
     }
 
-    Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
-        final long minLiveTimestamp = context.streamTime() - retentionPeriod;
+    Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context, final long streamTime) {
+        final long minLiveTimestamp = streamTime - retentionPeriod;
         final long minLiveSegment = segmentId(minLiveTimestamp);
 
         final Segment toReturn;
@@ -103,7 +103,7 @@ class Segments {
         }
     }
 
-    void openExisting(final InternalProcessorContext context) {
+    void openExisting(final InternalProcessorContext context, final long streamTime) {
         try {
             final File dir = new File(context.stateDir(), name);
             if (dir.exists()) {
@@ -130,7 +130,7 @@ class Segments {
             // ignore
         }
 
-        final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
+        final long minLiveSegment = segmentId(streamTime - retentionPeriod);
         cleanupEarlierThan(minLiveSegment);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 1074f02f..e5e74d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -333,7 +333,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
         initStore(false);
         processor.init(context);
-        context.setStreamTime(20);
+
+        // dummy record to advance stream time
+        context.setRecordContext(new ProcessorRecordContext(20, -2, -3, "topic", null));
+        processor.process("dummy", "dummy");
+
         context.setRecordContext(new ProcessorRecordContext(0, -2, -3, "topic", null));
         processor.process("A", "1");
         context.setRecordContext(new ProcessorRecordContext(1, -2, -3, "topic", null));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 002ace2..43c3f40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -97,7 +97,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -117,7 +116,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = ARBITRARY_LONG;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -137,14 +135,12 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 0L;
         context.setRecordMetadata("topic", 0, 0, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, 1L);
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("topic", 0, 1, null, 1L);
-        context.setStreamTime(1L);
         processor.process("tick", new Change<>(null, null));
 
         assertThat(context.forwarded(), hasSize(1));
@@ -164,7 +160,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("topic", 0, 0, null, recordTime);
-        context.setStreamTime(recordTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -176,7 +171,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime2 = 100L;
         final long windowEnd2 = 101L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime2);
-        context.setStreamTime(recordTime2);
         processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE);
         assertThat(context.forwarded(), hasSize(0));
 
@@ -185,7 +179,6 @@ public class KTableSuppressProcessorTest {
         final long recordTime3 = 101L;
         final long windowEnd3 = 102L;
         context.setRecordMetadata("topic", 0, 1, null, recordTime3);
-        context.setStreamTime(recordTime3);
         processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -212,14 +205,12 @@ public class KTableSuppressProcessorTest {
         final long streamTime = 99L;
         final long windowEnd = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(streamTime);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
 
         context.setRecordMetadata("", 0, 1L, null, windowEnd);
-        context.setStreamTime(windowEnd);
         processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -237,7 +228,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
@@ -257,7 +247,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -274,7 +263,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -291,7 +279,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -311,7 +298,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L));
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -331,7 +317,6 @@ public class KTableSuppressProcessorTest {
 
         final long timestamp = 100L;
         context.setRecordMetadata("", 0, 0L, null, timestamp);
-        context.setStreamTime(timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
@@ -350,7 +335,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -373,7 +357,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         final String key = "hey";
         final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
@@ -396,7 +379,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
@@ -420,7 +402,6 @@ public class KTableSuppressProcessorTest {
         final KTableSuppressProcessor<String, Long> processor = harness.processor;
 
         final long timestamp = 100L;
-        context.setStreamTime(timestamp);
         context.setRecordMetadata("", 0, 0L, null, timestamp);
         context.setCurrentNode(new ProcessorNode("testNode"));
         final String key = "hey";
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 4ce9a9f..43df1d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -224,10 +224,5 @@ public class AbstractProcessorContextTest {
 
         @Override
         public void commit() {}
-
-        @Override
-        public long streamTime() {
-            throw new RuntimeException("not implemented");
-        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index ee0069a..0ee7188 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -81,6 +81,7 @@ public class RocksDBSegmentedBytesStoreTest {
     private RocksDBSegmentedBytesStore bytesStore;
     private File stateDir;
     private final Window[] windows = new Window[4];
+    private Window nextSegmentWindow;
 
     @Parameter
     public SegmentedBytesStore.KeySchema schema;
@@ -98,12 +99,22 @@ public class RocksDBSegmentedBytesStoreTest {
             windows[1] = new SessionWindow(500L, 1000L);
             windows[2] = new SessionWindow(1_000L, 1_500L);
             windows[3] = new SessionWindow(30_000L, 60_000L);
+            // All four of the previous windows will go into segment 1.
+            // The nextSegmentWindow is computed be a high enough time that when it gets written
+            // to the segment store, it will advance stream time past the first segment's retention time and
+            // expire it.
+            nextSegmentWindow = new SessionWindow(segmentInterval + retention, segmentInterval + retention);
         }
         if (schema instanceof WindowKeySchema) {
             windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow);
             windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow);
             windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow);
             windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow);
+            // All four of the previous windows will go into segment 1.
+            // The nextSegmentWindow is computed be a high enough time that when it gets written
+            // to the segment store, it will advance stream time past the first segment's retention time and
+            // expire it.
+            nextSegmentWindow = timeWindowForSize(segmentInterval + retention, windowSizeForTimeWindow);
         }
 
 
@@ -415,8 +426,13 @@ public class RocksDBSegmentedBytesStoreTest {
         LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class);
         final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
 
-        context.setStreamTime(Math.max(retention, segmentInterval) * 2);
-        bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5));
+        // write a record to advance stream time, with a high enough timestamp
+        // that the subsequent record in windows[0] will already be expired.
+        bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0));
+
+        final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+        final byte[] value = serializeValue(5);
+        bytesStore.put(key, value);
 
         LogCaptureAppender.unregister(appender);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c41b094..588f12f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -159,7 +159,6 @@ public class RocksDBWindowStoreTest {
 
     private void setCurrentTime(final long currentTime) {
         context.setRecordContext(createRecordContext(currentTime));
-        context.setStreamTime(currentTime);
     }
 
     private ProcessorRecordContext createRecordContext(final long time) {
@@ -727,9 +726,11 @@ public class RocksDBWindowStoreTest {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        context.setStreamTime(segmentInterval * 6L);
         windowStore = createWindowStore(context, false);
 
+        // put something in the store to advance its stream time and expire the old segments
+        windowStore.put(1, "v", 6L * segmentInterval);
+
         final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
         expected.sort(String::compareTo);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index efed24f..0c16457 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -94,9 +94,9 @@ public class SegmentsTest {
 
     @Test
     public void shouldCreateSegments() {
-        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
+        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L);
         assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory());
@@ -107,17 +107,16 @@ public class SegmentsTest {
 
     @Test
     public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        updateStreamTimeAndCreateSegment(7);
-        assertNull(segments.getOrCreateSegmentIfLive(0, context));
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
         assertFalse(new File(context.stateDir(), "test/test.0").exists());
     }
 
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
-        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        context.setStreamTime(SEGMENT_INTERVAL * 7);
-        final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
+        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
@@ -128,22 +127,22 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentForTimestamp() {
-        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
+        final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
         assertEquals(segment, segments.getSegmentForTimestamp(0L));
     }
 
     @Test
     public void shouldGetCorrectSegmentString() {
-        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
+        final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L);
         assertEquals("Segment(id=0, name=test.0)", segment.toString());
     }
 
     @Test
     public void shouldCloseAllOpenSegments() {
-        final Segment first = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment second = segments.getOrCreateSegmentIfLive(1, context);
-        final Segment third = segments.getOrCreateSegmentIfLive(2, context);
+        final Segment first = segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final Segment second = segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final Segment third = segments.getOrCreateSegmentIfLive(2, context, -1L);
         segments.close();
 
         assertFalse(first.isOpen());
@@ -154,16 +153,16 @@ public class SegmentsTest {
     @Test
     public void shouldOpenExistingSegments() {
         segments = new Segments("test", 4, 1);
-        segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
-        segments.getOrCreateSegmentIfLive(2, context);
-        segments.getOrCreateSegmentIfLive(3, context);
-        segments.getOrCreateSegmentIfLive(4, context);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.getOrCreateSegmentIfLive(3, context, -1L);
+        segments.getOrCreateSegmentIfLive(4, context, -1L);
         // close existing.
         segments.close();
 
         segments = new Segments("test", 4, 1);
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         assertTrue(segments.getSegmentForTimestamp(0).isOpen());
         assertTrue(segments.getSegmentForTimestamp(1).isOpen());
@@ -178,12 +177,12 @@ public class SegmentsTest {
         updateStreamTimeAndCreateSegment(1);
         updateStreamTimeAndCreateSegment(2);
         updateStreamTimeAndCreateSegment(3);
-        updateStreamTimeAndCreateSegment(4);
-        segments.getOrCreateSegmentIfLive(0, context);
-        segments.getOrCreateSegmentIfLive(1, context);
-        segments.getOrCreateSegmentIfLive(2, context);
-        segments.getOrCreateSegmentIfLive(3, context);
-        segments.getOrCreateSegmentIfLive(4, context);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
+        segments.getOrCreateSegmentIfLive(0, context, streamTime);
+        segments.getOrCreateSegmentIfLive(1, context, streamTime);
+        segments.getOrCreateSegmentIfLive(2, context, streamTime);
+        segments.getOrCreateSegmentIfLive(3, context, streamTime);
+        segments.getOrCreateSegmentIfLive(4, context, streamTime);
 
         final List<Segment> segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
@@ -235,17 +234,18 @@ public class SegmentsTest {
         verifyCorrectSegments(0, 3);
         updateStreamTimeAndCreateSegment(3);
         verifyCorrectSegments(0, 4);
-        updateStreamTimeAndCreateSegment(4);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
         verifyCorrectSegments(0, 5);
-        segments.getOrCreateSegmentIfLive(5, context);
+        segments.getOrCreateSegmentIfLive(5, context, streamTime);
         verifyCorrectSegments(0, 6);
-        segments.getOrCreateSegmentIfLive(6, context);
+        segments.getOrCreateSegmentIfLive(6, context, streamTime);
         verifyCorrectSegments(0, 7);
     }
 
-    private void updateStreamTimeAndCreateSegment(final int segment) {
-        context.setStreamTime(SEGMENT_INTERVAL * segment);
-        segments.getOrCreateSegmentIfLive(segment, context);
+    private long updateStreamTimeAndCreateSegment(final int segment) {
+        final long streamTime = SEGMENT_INTERVAL * segment;
+        segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+        return streamTime;
     }
 
     @Test
@@ -268,7 +268,7 @@ public class SegmentsTest {
             oldSegment.createNewFile();
         }
 
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
             final String segmentName = storeName + "." + (long) segmentId * segmentInterval;
@@ -290,7 +290,7 @@ public class SegmentsTest {
             oldSegment.createNewFile();
         }
 
-        segments.openExisting(context);
+        segments.openExisting(context, -1L);
 
         for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) {
             final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1)));
@@ -300,7 +300,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldClearSegmentsOnClose() {
-        segments.getOrCreateSegmentIfLive(0, context);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
         segments.close();
         assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index ddff7a8..d70c7b9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -16,30 +16,28 @@
  */
 package org.apache.kafka.streams.tests;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
+import java.time.Duration;
 import java.util.Properties;
 
 public class SmokeTestClient extends SmokeTestUtil {
@@ -115,85 +113,76 @@ public class SmokeTestClient extends SmokeTestUtil {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
         final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
-        source.to("echo", Produced.with(stringSerde, intSerde));
-        final KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
-            @Override
-            public boolean test(final String key, final Integer value) {
-                return value == null || value != END;
-            }
-        });
+        source.filterNot((k, v) -> k.equals("flush"))
+              .to("echo", Produced.with(stringSerde, intSerde));
+        final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
         data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
         final KGroupedStream<String, Integer> groupedData =
             data.groupByKey(Serialized.with(stringSerde, intSerde));
 
-        groupedData
-            .windowedBy(TimeWindows.of(Duration.ofDays(1)))
+        final KTable<Windowed<String>, Integer> minAggregation = groupedData
+            .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MAX_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value < aggregate) ? value : aggregate;
-                    }
-                },
-                Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+                () -> Integer.MAX_VALUE,
+                (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
+                Materialized
+                    .<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
+                    .withValueSerde(intSerde)
+                    .withRetention(Duration.ofHours(25))
+            );
+
+        minAggregation
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("min-raw", Produced.with(stringSerde, intSerde));
+
+        minAggregation
+            .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
+            .toStream()
+            .filterNot((k, v) -> k.key().equals("flush"))
+            .map((key, value) -> new KeyValue<>(key.toString(), value))
+            .to("min-suppressed", Produced.with(stringSerde, intSerde));
+
+        minAggregation
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("min", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> minTable = builder.table(
             "min",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("minStoreName"));
+            Materialized.as("minStoreName"));
         minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Integer>() {
-                    public Integer apply() {
-                        return Integer.MIN_VALUE;
-                    }
-                },
-                new Aggregator<String, Integer, Integer>() {
-                    @Override
-                    public Integer apply(final String aggKey, final Integer value, final Integer aggregate) {
-                        return (value > aggregate) ? value : aggregate;
-                    }
-                },
+                () -> Integer.MIN_VALUE,
+                (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
                 Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
-            .toStream(new Unwindow<String, Integer>())
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("max", Produced.with(stringSerde, intSerde));
 
         final KTable<String, Integer> maxTable = builder.table(
             "max",
             Consumed.with(stringSerde, intSerde),
-            Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("maxStoreName"));
+            Materialized.as("maxStoreName"));
         maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
             .aggregate(
-                new Initializer<Long>() {
-                    public Long apply() {
-                        return 0L;
-                    }
-                },
-                new Aggregator<String, Integer, Long>() {
-                    @Override
-                    public Long apply(final String aggKey, final Integer value, final Long aggregate) {
-                        return (long) value + aggregate;
-                    }
-                },
+                () -> 0L,
+                (aggKey, value, aggregate) -> (long) value + aggregate,
                 Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
-            .toStream(new Unwindow<String, Long>())
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("sum", Produced.with(stringSerde, longSerde));
 
         final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
@@ -203,38 +192,33 @@ public class SmokeTestClient extends SmokeTestUtil {
         // cnt
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(2)))
-            .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("uwin-cnt"))
-            .toStream(new Unwindow<String, Long>())
+            .count(Materialized.as("uwin-cnt"))
+            .toStream(new Unwindow<>())
+            .filterNot((k, v) -> k.equals("flush"))
             .to("cnt", Produced.with(stringSerde, longSerde));
 
         final KTable<String, Long> cntTable = builder.table(
             "cnt",
             Consumed.with(stringSerde, longSerde),
-            Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("cntStoreName"));
+            Materialized.as("cntStoreName"));
         cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif
         maxTable
             .join(
                 minTable,
-                new ValueJoiner<Integer, Integer, Integer>() {
-                    public Integer apply(final Integer value1, final Integer value2) {
-                        return value1 - value2;
-                    }
-                })
+                (value1, value2) -> value1 - value2)
             .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
             .to("dif", Produced.with(stringSerde, intSerde));
 
         // avg
         sumTable
             .join(
                 cntTable,
-                new ValueJoiner<Long, Long, Double>() {
-                    public Double apply(final Long value1, final Long value2) {
-                        return (double) value1 / (double) value2;
-                    }
-                })
+                (value1, value2) -> (double) value1 / (double) value2)
             .toStream()
+            .filterNot((k, v) -> k.equals("flush"))
             .to("avg", Produced.with(stringSerde, doubleSerde));
 
         // test repartition
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
index 7087298..aef7fa7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -41,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -214,6 +215,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
             }
         }
 
+        // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
+        // all suppressed records.
+        final List<PartitionInfo> partitions = producer.partitionsFor("data");
+        for (final PartitionInfo partition : partitions) {
+            producer.send(new ProducerRecord<>(
+                partition.topic(),
+                partition.partition(),
+                System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+                stringSerde.serializer().serialize("", "flush"),
+                intSerde.serializer().serialize("", 0)
+            ));
+        }
+
         producer.close();
         return Collections.unmodifiableMap(allData);
     }
@@ -262,7 +276,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
 
         final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);
-        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
+        final List<TopicPartition> partitions = getAllPartitions(consumer, "echo", "max", "min", "min-suppressed", "dif", "sum", "cnt", "avg", "wcnt", "tagg");
         consumer.assign(partitions);
         consumer.seekToBeginning(partitions);
 
@@ -271,6 +285,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
 
         final HashMap<String, Integer> max = new HashMap<>();
         final HashMap<String, Integer> min = new HashMap<>();
+        final HashMap<String, List<Integer>> minSuppressed = new HashMap<>();
         final HashMap<String, Integer> dif = new HashMap<>();
         final HashMap<String, Long> sum = new HashMap<>();
         final HashMap<String, Long> cnt = new HashMap<>();
@@ -290,6 +305,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
             final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500));
             if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                 if (verifyMin(min, allData, false)
+                    && verifyMinSuppressed(minSuppressed, allData, false)
                     && verifyMax(max, allData, false)
                     && verifyDif(dif, allData, false)
                     && verifySum(sum, allData, false)
@@ -316,6 +332,10 @@ public class SmokeTestDriver extends SmokeTestUtil {
                         case "min":
                             min.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
+                        case "min-suppressed":
+                            minSuppressed.computeIfAbsent(key, k -> new LinkedList<>())
+                                         .add(intSerde.deserializer().deserialize("", record.value()));
+                            break;
                         case "max":
                             max.put(key, intSerde.deserializer().deserialize("", record.value()));
                             break;
@@ -372,6 +392,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
         }
 
         success &= verifyMin(min, allData, true);
+        success &= verifyMinSuppressed(minSuppressed, allData, true);
         success &= verifyMax(max, allData, true);
         success &= verifyDif(dif, allData, true);
         success &= verifySum(sum, allData, true);
@@ -412,6 +433,41 @@ public class SmokeTestDriver extends SmokeTestUtil {
         return true;
     }
 
+    private static boolean verifyMinSuppressed(final Map<String, List<Integer>> map,
+                                               final Map<String, Set<Integer>> allData,
+                                               final boolean print) {
+        if (map.isEmpty()) {
+            maybePrint(print, "min-suppressed is empty");
+            return false;
+        } else {
+            maybePrint(print, "verifying min-suppressed");
+
+            if (map.size() != allData.size()) {
+                maybePrint(print, "fail: resultCount=" + map.size() + " expectedCount=" + allData.size());
+                return false;
+            }
+            for (final Map.Entry<String, List<Integer>> entry : map.entrySet()) {
+                final String key = entry.getKey();
+                final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
+                final int expected = getMin(unwindowedKey);
+                if (entry.getValue().size() != 1) {
+                    maybePrint(print, "fail: key=" + entry.getKey() + " non-unique value: " + entry.getValue());
+                    return false;
+                } else if (expected != entry.getValue().get(0)) {
+                    maybePrint(print, "fail: key=" + entry.getKey() + " min=" + entry.getValue().get(0) + " expected=" + expected);
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+    private static void maybePrint(final boolean print, final String s) {
+        if (print) {
+            System.out.println(s);
+        }
+    }
+
     private static boolean verifyMax(final Map<String, Integer> map, final Map<String, Set<Integer>> allData, final boolean print) {
         if (map.isEmpty()) {
             if (print) {
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 2f356bf..f483adf 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.test;
 
-import java.time.Duration;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -33,10 +32,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@@ -44,6 +43,7 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -64,7 +64,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     private Serde<?> keySerde;
     private Serde<?> valSerde;
     private long timestamp = -1L;
-    private long streamTime = -1L;
 
     public InternalMockProcessorContext() {
         this(null,
@@ -175,15 +174,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     @Override
     public void initialize() {}
 
-    public void setStreamTime(final long currentTime) {
-        streamTime = currentTime;
-    }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
     @Override
     public File stateDir() {
         if (stateDir == null) {
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 14f8561..62a8491 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -25,7 +25,6 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
     private ProcessorNode currentNode;
-    private long streamTime;
 
     @Override
     public StreamsMetricsImpl metrics() {
@@ -72,13 +71,4 @@ public class MockInternalProcessorContext extends MockProcessorContext implement
     public void uninitialize() {
 
     }
-
-    @Override
-    public long streamTime() {
-        return streamTime;
-    }
-
-    public void setStreamTime(final long streamTime) {
-        this.streamTime = streamTime;
-    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 36d049c..6503408 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -95,11 +95,6 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
-    public long streamTime() {
-        throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext");
-    }
-
-    @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
         // no-op