You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/21 01:40:59 UTC

[kafka] branch trunk updated: KAFKA-7072: clean up segments only after they expire (#5253)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6732593  KAFKA-7072: clean up segments only after they expire (#5253)
6732593 is described below

commit 6732593bbad3d1e07f1ec7c2d00a2e1eabf382f2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Wed Jun 20 20:40:48 2018 -0500

    KAFKA-7072: clean up segments only after they expire (#5253)
    
    Significant refactor of Segments to use stream-time as the basis of segment expiration.
    Previously Segments assumed that the current record time was representative of stream time.
    
    In the event of a "future" event (one whose record time is greater than the stream time), this
    would inappropriately drop live segments. Now, Segments will provision the new segment
    to house the future event and drop old segments only after they expire.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../internals/GlobalProcessorContextImpl.java      |   4 +
 .../internals/InternalProcessorContext.java        |   2 +
 .../processor/internals/ProcessorContextImpl.java  |  11 ++
 .../processor/internals/StandbyContextImpl.java    |   5 +
 .../streams/processor/internals/StreamTask.java    |   4 +-
 .../org/apache/kafka/streams/state/Stores.java     |   8 +-
 .../internals/RocksDBSegmentedBytesStore.java      |  20 ++-
 .../RocksDbSessionBytesStoreSupplier.java          |   8 +-
 .../internals/RocksDbWindowBytesStoreSupplier.java |  15 +-
 .../kafka/streams/state/internals/Segments.java    | 171 +++++++++------------
 .../internals/AbstractProcessorContextTest.java    |   5 +
 .../state/internals/CachingSessionStoreTest.java   |  20 +--
 .../state/internals/CachingWindowStoreTest.java    |  18 +--
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  16 +-
 .../state/internals/RocksDBSessionStoreTest.java   |   2 +-
 .../state/internals/RocksDBWindowStoreTest.java    | 118 +++++++-------
 .../streams/state/internals/SegmentsTest.java      | 112 +++++++++-----
 .../kafka/test/InternalMockProcessorContext.java   |  10 ++
 .../apache/kafka/test/NoOpProcessorContext.java    |   5 +
 19 files changed, 305 insertions(+), 249 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 717e6a7..c469be9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -96,4 +96,8 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
         throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
     }
 
+    @Override
+    public Long streamTime() {
+        throw new RuntimeException("Stream time is not implemented for the global processor context.");
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 9439aba..cfc1970 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -63,4 +63,6 @@ public interface InternalProcessorContext extends ProcessorContext {
      * Mark this context as being uninitialized
      */
     void uninitialize();
+
+    Long streamTime();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index f1ee81f..36a309c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -28,11 +28,13 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
+import java.util.function.Supplier;
 
 public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
 
     private final StreamTask task;
     private final RecordCollector collector;
+    private Supplier<Long> streamTimeSupplier;
     private final ToInternal toInternal = new ToInternal();
     private final static To SEND_TO_ALL = To.all();
 
@@ -153,4 +155,13 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
         return task.schedule(interval, type, callback);
     }
 
+    void setStreamTimeSupplier(final Supplier<Long> streamTimeSupplier) {
+        this.streamTimeSupplier = streamTimeSupplier;
+    }
+
+    @Override
+    public Long streamTime() {
+        return this.streamTimeSupplier.get();
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 5c278c9..58c2e2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -216,4 +216,9 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
         throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
     }
 
+    @Override
+    public Long streamTime() {
+        throw new RuntimeException("Stream time is not implemented for the standby context.");
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f996958..6993ef8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -189,7 +189,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
         // initialize the topology with its own context
-        processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
+        final ProcessorContextImpl processorContextImpl = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
+        processorContext = processorContextImpl;
 
         final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
         final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
@@ -209,6 +210,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         recordInfo = new PartitionGroup.RecordInfo();
         partitionGroup = new PartitionGroup(partitionQueues);
+        processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
 
         stateMgr.registerGlobalStateStores(topology.globalStateStores());
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 27b985b..eebd59f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -161,13 +161,15 @@ public class Stores {
         if (retentionPeriod < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
         }
-        if (numSegments < 1) {
-            throw new IllegalArgumentException("numSegments cannot must smaller than 1");
+        if (numSegments < 2) {
+            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
         }
         if (windowSize < 0) {
             throw new IllegalArgumentException("windowSize cannot be negative");
         }
-        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, numSegments, windowSize, retainDuplicates);
+        final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+
+        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index ec9e6f7..c5d15d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -20,26 +20,30 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
 class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
+    private final static Logger LOG = LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
 
     private final String name;
     private final Segments segments;
     private final KeySchema keySchema;
-    private ProcessorContext context;
+    private InternalProcessorContext context;
     private volatile boolean open;
 
     RocksDBSegmentedBytesStore(final String name,
                                final long retention,
-                               final int numSegments,
+                               final long segmentInterval,
                                final KeySchema keySchema) {
         this.name = name;
         this.keySchema = keySchema;
-        this.segments = new Segments(name, retention, numSegments);
+        this.segments = new Segments(name, retention, segmentInterval);
     }
 
     @Override
@@ -97,8 +101,10 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     @Override
     public void put(final Bytes key, final byte[] value) {
         final long segmentId = segments.segmentId(keySchema.segmentTimestamp(key));
-        final Segment segment = segments.getOrCreateSegment(segmentId, context);
-        if (segment != null) {
+        final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context);
+        if (segment == null) {
+            LOG.debug("Skipping record for expired segment.");
+        } else {
             segment.put(key, value);
         }
     }
@@ -119,11 +125,11 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
 
     @Override
     public void init(ProcessorContext context, StateStore root) {
-        this.context = context;
+        this.context = (InternalProcessorContext) context;
 
         keySchema.init(ProcessorStateManager.storeChangelogTopic(context.applicationId(), root.name()));
 
-        segments.openExisting(context);
+        segments.openExisting(this.context);
 
         // register and possibly restore the state from the logs
         context.register(root, new StateRestoreCallback() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 5a87bc5..c83ab59 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -25,8 +25,6 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
     private final String name;
     private final long retentionPeriod;
 
-    private static final int NUM_SEGMENTS = 3;
-
     public RocksDbSessionBytesStoreSupplier(final String name,
                                             final long retentionPeriod) {
         this.name = name;
@@ -43,7 +41,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
         final RocksDBSegmentedBytesStore segmented = new RocksDBSegmentedBytesStore(
             name,
             retentionPeriod,
-            NUM_SEGMENTS,
+            segmentIntervalMs(),
             new SessionKeySchema());
         return new RocksDBSessionStore<>(segmented, Serdes.Bytes(), Serdes.ByteArray());
     }
@@ -55,8 +53,6 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
 
     @Override
     public long segmentIntervalMs() {
-        return Segments.segmentInterval(
-            retentionPeriod,
-            NUM_SEGMENTS);
+        return Math.max(retentionPeriod / 2, 60_000L);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 5fbf491..4a7bffc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -24,23 +24,18 @@ import org.apache.kafka.streams.state.WindowStore;
 public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
-    private final int segments;
+    private final long segmentInterval;
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    private static final int MIN_SEGMENTS = 2;
-
     public RocksDbWindowBytesStoreSupplier(final String name,
                                            final long retentionPeriod,
-                                           final int segments,
+                                           final long segmentInterval,
                                            final long windowSize,
                                            final boolean retainDuplicates) {
-        if (segments < MIN_SEGMENTS) {
-            throw new IllegalArgumentException("numSegments must be >= " + MIN_SEGMENTS);
-        }
         this.name = name;
         this.retentionPeriod = retentionPeriod;
-        this.segments = segments;
+        this.segmentInterval = segmentInterval;
         this.windowSize = windowSize;
         this.retainDuplicates = retainDuplicates;
     }
@@ -55,7 +50,7 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
         final RocksDBSegmentedBytesStore segmentedBytesStore = new RocksDBSegmentedBytesStore(
                 name,
                 retentionPeriod,
-                segments,
+                segmentInterval,
                 new WindowKeySchema()
         );
         return new RocksDBWindowStore<>(segmentedBytesStore,
@@ -73,7 +68,7 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
 
     @Override
     public int segments() {
-        return segments;
+        return (int) (retentionPeriod / segmentInterval) + 1;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 7b3336a..8e6c524 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -16,9 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,35 +27,29 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.SimpleTimeZone;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.TreeMap;
 
 /**
  * Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore}
  */
 class Segments {
     private static final Logger log = LoggerFactory.getLogger(Segments.class);
-    static final long MIN_SEGMENT_INTERVAL = 60 * 1000L;
 
-    static long segmentInterval(long retentionPeriod, int numSegments) {
-        return Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
-    }
-
-    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
+    private final TreeMap<Long, Segment> segments = new TreeMap<>();
     private final String name;
-    private final int numSegments;
+    private final long retentionPeriod;
     private final long segmentInterval;
     private final SimpleDateFormat formatter;
-    private long minSegmentId = Long.MAX_VALUE;
-    private long maxSegmentId = -1L;
 
-    Segments(final String name, final long retentionPeriod, final int numSegments) {
+    Segments(final String name, final long retentionPeriod, final long segmentInterval) {
         this.name = name;
-        this.numSegments = numSegments;
-        this.segmentInterval = segmentInterval(retentionPeriod, numSegments);
+        this.segmentInterval = segmentInterval;
+        this.retentionPeriod = retentionPeriod;
         // Create a date formatter. Formatted timestamps are used as segment name suffixes
         this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
@@ -75,42 +68,53 @@ class Segments {
     }
 
     Segment getSegmentForTimestamp(final long timestamp) {
-        return getSegment(segmentId(timestamp));
+        return segments.get(segmentId(timestamp));
     }
 
-    Segment getOrCreateSegment(final long segmentId, final ProcessorContext context) {
-        if (segmentId > maxSegmentId - numSegments) {
-            final long key = segmentId % numSegments;
-            final Segment segment = segments.get(key);
-            if (!isSegment(segment, segmentId)) {
-                cleanup(segmentId);
-            }
-            Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
-            Segment previousSegment = segments.putIfAbsent(key, newSegment);
-            if (previousSegment == null) {
-                newSegment.openDB(context);
-                maxSegmentId = segmentId > maxSegmentId ? segmentId : maxSegmentId;
-                minSegmentId = segmentId < minSegmentId ? segmentId : minSegmentId;
-            }
-            return previousSegment == null ? newSegment : previousSegment;
+    Segment getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) {
+        final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
+
+        final Segment toReturn;
+        if (segmentId >= minLiveSegment) {
+            // The segment is live. get it, ensure it's open, and return it.
+            toReturn = getOrCreateSegment(segmentId, context);
+        } else {
+            toReturn = null;
+        }
+
+        cleanupEarlierThan(minLiveSegment);
+        return toReturn;
+    }
+
+    private Segment getOrCreateSegment(final long segmentId, final InternalProcessorContext context) {
+        if (segments.containsKey(segmentId)) {
+            return segments.get(segmentId);
         } else {
-            return null;
+            final Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
+            final Segment shouldBeNull = segments.put(segmentId, newSegment);
+
+            if (shouldBeNull != null) {
+                throw new IllegalStateException("Segment already exists. Possible concurrent access.");
+            }
+
+            newSegment.openDB(context);
+            return newSegment;
         }
     }
 
-    void openExisting(final ProcessorContext context) {
+    void openExisting(final InternalProcessorContext context) {
         try {
-            File dir = new File(context.stateDir(), name);
+            final File dir = new File(context.stateDir(), name);
             if (dir.exists()) {
-                String[] list = dir.list();
+                final String[] list = dir.list();
                 if (list != null) {
-                    long[] segmentIds = new long[list.length];
+                    final long[] segmentIds = new long[list.length];
                     for (int i = 0; i < list.length; i++)
                         segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
 
                     // open segments in the id order
                     Arrays.sort(segmentIds);
-                    for (long segmentId : segmentIds) {
+                    for (final long segmentId : segmentIds) {
                         if (segmentId >= 0) {
                             getOrCreateSegment(segmentId, context);
                         }
@@ -121,89 +125,66 @@ class Segments {
                     throw new ProcessorStateException(String.format("dir %s doesn't exist and cannot be created for segments %s", dir, name));
                 }
             }
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             // ignore
         }
+
+        final long minLiveSegment = segmentId(context.streamTime() - retentionPeriod);
+        cleanupEarlierThan(minLiveSegment);
     }
 
     List<Segment> segments(final long timeFrom, final long timeTo) {
-        final long segFrom = Math.max(minSegmentId, segmentId(Math.max(0L, timeFrom)));
-        final long segTo = Math.min(maxSegmentId, segmentId(Math.min(maxSegmentId * segmentInterval, Math.max(0, timeTo))));
-
-        final List<Segment> segments = new ArrayList<>();
-        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
-            Segment segment = getSegment(segmentId);
-            if (segment != null && segment.isOpen()) {
-                try {
-                    segments.add(segment);
-                } catch (InvalidStateStoreException ise) {
-                    // segment may have been closed by streams thread;
-                }
+        final List<Segment> result = new ArrayList<>();
+        final NavigableMap<Long, Segment> segmentsInRange = segments.subMap(
+            segmentId(timeFrom), true,
+            segmentId(timeTo), true
+        );
+        for (final Segment segment : segmentsInRange.values()) {
+            if (segment.isOpen()) {
+                result.add(segment);
             }
         }
-        return segments;
+        return result;
     }
 
     List<Segment> allSegments() {
-        final List<Segment> segments = new ArrayList<>();
-        for (Segment segment : this.segments.values()) {
+        final List<Segment> result = new ArrayList<>();
+        for (final Segment segment : segments.values()) {
             if (segment.isOpen()) {
-                try {
-                    segments.add(segment);
-                } catch (InvalidStateStoreException ise) {
-                    // segment may have been closed by streams thread;
-                }
+                result.add(segment);
             }
         }
-        Collections.sort(segments);
-        return segments;
+        return result;
     }
-    
+
     void flush() {
-        for (Segment segment : segments.values()) {
+        for (final Segment segment : segments.values()) {
             segment.flush();
         }
     }
 
     public void close() {
-        for (Segment segment : segments.values()) {
+        for (final Segment segment : segments.values()) {
             segment.close();
         }
         segments.clear();
     }
 
-    private Segment getSegment(long segmentId) {
-        final Segment segment = segments.get(segmentId % numSegments);
-        if (!isSegment(segment, segmentId)) {
-            return null;
-        }
-        return segment;
-    }
-
-    private boolean isSegment(final Segment store, long segmentId) {
-        return store != null && store.id == segmentId;
-    }
-
-    private void cleanup(final long segmentId) {
-        final long oldestSegmentId = maxSegmentId < segmentId
-                ? segmentId - numSegments
-                : maxSegmentId - numSegments;
+    private void cleanupEarlierThan(final long minLiveSegment) {
+        final Iterator<Map.Entry<Long, Segment>> toRemove =
+            segments.headMap(minLiveSegment, false).entrySet().iterator();
 
-        for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
-            final Segment segment = segmentEntry.getValue();
-            if (segment != null && segment.id <= oldestSegmentId) {
-                segments.remove(segmentEntry.getKey());
-                segment.close();
-                try {
-                    segment.destroy();
-                } catch (IOException e) {
-                    log.error("Error destroying {}", segment, e);
-                }
+        while (toRemove.hasNext()) {
+            final Map.Entry<Long, Segment> next = toRemove.next();
+            toRemove.remove();
+            final Segment segment = next.getValue();
+            segment.close();
+            try {
+                segment.destroy();
+            } catch (final IOException e) {
+                log.error("Error destroying {}", segment, e);
             }
         }
-        if (oldestSegmentId > minSegmentId) {
-            minSegmentId = oldestSegmentId + 1;
-        }
     }
 
     private long segmentIdFromSegmentName(final String segmentName,
@@ -217,7 +198,7 @@ class Segments {
         if (segmentSeparator == '-') {
             try {
                 segmentId = formatter.parse(segmentIdString).getTime() / segmentInterval;
-            } catch (ParseException e) {
+            } catch (final ParseException e) {
                 log.warn("Unable to parse segmentName {} to a date. This segment will be skipped", segmentName);
                 return -1L;
             }
@@ -226,7 +207,7 @@ class Segments {
             // for both new formats (with : or .) parse segment ID identically
             try {
                 segmentId = Long.parseLong(segmentIdString) / segmentInterval;
-            } catch (NumberFormatException e) {
+            } catch (final NumberFormatException e) {
                 throw new ProcessorStateException("Unable to parse segment id as long from segmentName: " + segmentName);
             }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index d3f8dda..1ef3f5d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -216,5 +216,10 @@ public class AbstractProcessorContextTest {
 
         @Override
         public void commit() {}
+
+        @Override
+        public Long streamTime() {
+            throw new RuntimeException("not implemented");
+        }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
index baa9ee4..194edb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingSessionStoreTest.java
@@ -66,13 +66,13 @@ public class CachingSessionStoreTest {
         final SessionKeySchema schema = new SessionKeySchema();
         schema.init("topic");
         final int retention = 60000;
-        final int numSegments = 3;
-        underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, schema);
+        final int segmentInterval = 60_000;
+        underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, schema);
         final RocksDBSessionStore<Bytes, byte[]> sessionStore = new RocksDBSessionStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray());
         cachingStore = new CachingSessionStore<>(sessionStore,
                                                  Serdes.String(),
                                                  Serdes.String(),
-                                                 Segments.segmentInterval(retention, numSegments)
+                                                 segmentInterval
                                                  );
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, cache);
@@ -185,13 +185,13 @@ public class CachingSessionStoreTest {
     @Test
     public void shouldFetchCorrectlyAcrossSegments() {
         final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
-        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
-        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(120_000, 120_000));
         cachingStore.put(a1, "1".getBytes());
         cachingStore.put(a2, "2".getBytes());
         cachingStore.put(a3, "3".getBytes());
         cachingStore.flush();
-        final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, Segments.MIN_SEGMENT_INTERVAL * 2);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> results = cachingStore.findSessions(keyA, 0, 60_000 * 2);
         assertEquals(a1, results.next().key);
         assertEquals(a2, results.next().key);
         assertEquals(a3, results.next().key);
@@ -202,9 +202,9 @@ public class CachingSessionStoreTest {
     public void shouldFetchRangeCorrectlyAcrossSegments() {
         final Windowed<Bytes> a1 = new Windowed<>(keyA, new SessionWindow(0, 0));
         final Windowed<Bytes> aa1 = new Windowed<>(keyAA, new SessionWindow(0, 0));
-        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(Segments.MIN_SEGMENT_INTERVAL, Segments.MIN_SEGMENT_INTERVAL));
-        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
-        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(Segments.MIN_SEGMENT_INTERVAL * 2, Segments.MIN_SEGMENT_INTERVAL * 2));
+        final Windowed<Bytes> a2 = new Windowed<>(keyA, new SessionWindow(60_000, 60_000));
+        final Windowed<Bytes> a3 = new Windowed<>(keyA, new SessionWindow(60_000 * 2, 60_000 * 2));
+        final Windowed<Bytes> aa3 = new Windowed<>(keyAA, new SessionWindow(60_000 * 2, 60_000 * 2));
         cachingStore.put(a1, "1".getBytes());
         cachingStore.put(aa1, "1".getBytes());
         cachingStore.put(a2, "2".getBytes());
@@ -212,7 +212,7 @@ public class CachingSessionStoreTest {
         cachingStore.put(aa3, "3".getBytes());
         cachingStore.flush();
 
-        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, Segments.MIN_SEGMENT_INTERVAL * 2);
+        final KeyValueIterator<Windowed<Bytes>, byte[]> rangeResults = cachingStore.findSessions(keyA, keyAA, 0, 60_000 * 2);
         assertEquals(a1, rangeResults.next().key);
         assertEquals(aa1, rangeResults.next().key);
         assertEquals(a2, rangeResults.next().key);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
index b8808ca..118acec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingWindowStoreTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -39,6 +38,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.apache.kafka.test.StreamsTestUtils.toList;
 import static org.apache.kafka.test.StreamsTestUtils.verifyKeyValueList;
@@ -67,16 +67,16 @@ public class CachingWindowStoreTest {
     @Before
     public void setUp() {
         keySchema = new WindowKeySchema();
-        final int retention = 30000;
-        final int numSegments = 3;
-        underlying = new RocksDBSegmentedBytesStore("test", retention, numSegments, keySchema);
+        final int retention = 60_000;
+        final int segmentInterval = 60_000;
+        underlying = new RocksDBSegmentedBytesStore("test", retention, segmentInterval, keySchema);
         final RocksDBWindowStore<Bytes, byte[]> windowStore = new RocksDBWindowStore<>(underlying, Serdes.Bytes(), Serdes.ByteArray(), false, WINDOW_SIZE);
         cacheListener = new CachingKeyValueStoreTest.CacheFlushListenerStub<>();
         cachingStore = new CachingWindowStore<>(windowStore,
                                                 Serdes.String(),
                                                 Serdes.String(),
                                                 WINDOW_SIZE,
-                                                Segments.segmentInterval(retention, numSegments));
+                                                segmentInterval);
         cachingStore.setFlushListener(cacheListener, false);
         cache = new ThreadCache(new LogContext("testCache "), MAX_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
         topic = "topic";
@@ -313,7 +313,7 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
         cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
 
-        final List<KeyValue<Long, byte[]>> expected = Utils.mkList(KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(60000L, bytesValue("0005")));
+        final List<KeyValue<Long, byte[]>> expected = mkList(KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(60000L, bytesValue("0005")));
         final List<KeyValue<Long, byte[]>> actual = toList(cachingStore.fetch(bytesKey("a"), 0, Long.MAX_VALUE));
         verifyKeyValueList(expected, actual);
     }
@@ -326,13 +326,13 @@ public class CachingWindowStoreTest {
         cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1);
         cachingStore.put(bytesKey("a"), bytesValue("0005"), 60000);
 
-        verifyKeyValueList(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)),
+        verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("a", "0005", 60000L)),
                            toList(cachingStore.fetch(bytesKey("a"), bytesKey("a"), 0, Long.MAX_VALUE)));
 
-        verifyKeyValueList(Utils.mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)),
+        verifyKeyValueList(mkList(windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1)),
                            toList(cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0, Long.MAX_VALUE)));
 
-        verifyKeyValueList(Utils.mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)),
+        verifyKeyValueList(mkList(windowedPair("a", "0001", 0), windowedPair("a", "0003", 1), windowedPair("aa", "0002", 0), windowedPair("aa", "0004", 1), windowedPair("a", "0005", 60000L)),
                            toList(cachingStore.fetch(bytesKey("a"), bytesKey("aa"), 0, Long.MAX_VALUE)));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index db6d1d1..d7a7283 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -53,7 +53,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.SimpleTimeZone;
 
-import static org.apache.kafka.streams.state.internals.Segments.segmentInterval;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -65,6 +64,7 @@ import static org.junit.Assert.assertTrue;
 public class RocksDBSegmentedBytesStoreTest {
 
     private final long retention = 1000;
+    private final long segmentInterval = 60_000;
     private final int numSegments = 3;
     private InternalMockProcessorContext context;
     private final String storeName = "bytes-store";
@@ -102,7 +102,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                 retention,
-                numSegments,
+                segmentInterval,
                 schema);
 
         stateDir = TestUtils.tempDirectory();
@@ -163,7 +163,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldRollSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, numSegments);
+        final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50));
@@ -186,7 +186,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldGetAllSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, numSegments);
+        final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@@ -206,7 +206,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldFetchAllSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, numSegments);
+        final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@@ -225,7 +225,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Test
     public void shouldLoadSegementsWithOldStyleDateFormattedName() {
-        final Segments segments = new Segments(storeName, retention, numSegments);
+        final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
@@ -237,7 +237,7 @@ public class RocksDBSegmentedBytesStoreTest {
         final Long segmentId = Long.parseLong(nameParts[1]);
         final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
         formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
-        final String formatted = formatter.format(new Date(segmentId * segmentInterval(retention, numSegments)));
+        final String formatted = formatter.format(new Date(segmentId * segmentInterval));
         final File parent = new File(stateDir, storeName);
         final File oldStyleName = new File(parent, nameParts[0] + "-" + formatted);
         assertTrue(new File(parent, firstSegmentName).renameTo(oldStyleName));
@@ -256,7 +256,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Test
     public void shouldLoadSegementsWithOldStyleColonFormattedName() {
-        final Segments segments = new Segments(storeName, retention, numSegments);
+        final Segments segments = new Segments(storeName, retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), serializeValue(50L));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index bcb411b..c95cbba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -154,7 +154,7 @@ public class RocksDBSessionStoreTest {
     @Test
     public void shouldFetchExactKeys() {
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 2, new SessionKeySchema());
+                new RocksDBSegmentedBytesStore("session-store", 0x7a00000000000000L, 0x7a00000000000000L, new SessionKeySchema());
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c436e9e..2a84a7b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -69,9 +69,9 @@ public class RocksDBWindowStoreTest {
     private final int numSegments = 3;
     private final long windowSize = 3L;
     private final String windowName = "window";
-    private final long segmentSize = Segments.MIN_SEGMENT_INTERVAL;
+    private final long segmentSize = 60_000;
     private final long retentionPeriod = segmentSize * (numSegments - 1);
-    private final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
+    private final Segments segments = new Segments(windowName, retentionPeriod, segmentSize);
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
@@ -120,10 +120,6 @@ public class RocksDBWindowStoreTest {
         return store;
     }
 
-    private WindowStore<Integer, String> createWindowStore(final ProcessorContext context) {
-        return createWindowStore(context, false);
-    }
-
     @After
     public void closeStore() {
         if (windowStore != null) {
@@ -133,24 +129,24 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void shouldOnlyIterateOpenSegments() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         long currentTime = 0;
-        context.setRecordContext(createRecordContext(currentTime));
+        setCurrentTime(currentTime);
         windowStore.put(1, "one");
 
         currentTime = currentTime + segmentSize;
-        context.setRecordContext(createRecordContext(currentTime));
+        setCurrentTime(currentTime);
         windowStore.put(1, "two");
         currentTime = currentTime + segmentSize;
 
-        context.setRecordContext(createRecordContext(currentTime));
+        setCurrentTime(currentTime);
         windowStore.put(1, "three");
 
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
 
         // roll to the next segment that will close the first
         currentTime = currentTime + segmentSize;
-        context.setRecordContext(createRecordContext(currentTime));
+        setCurrentTime(currentTime);
         windowStore.put(1, "four");
 
         // should only have 2 values as the first segment is no longer open
@@ -159,13 +155,18 @@ public class RocksDBWindowStoreTest {
         assertFalse(iterator.hasNext());
     }
 
+    private void setCurrentTime(final long currentTime) {
+        context.setRecordContext(createRecordContext(currentTime));
+        context.setStreamTime(currentTime);
+    }
+
     private ProcessorRecordContext createRecordContext(final long time) {
         return new ProcessorRecordContext(time, 0, 0, "topic", null);
     }
 
     @Test
     public void testRangeAndSinglePointFetch() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -224,7 +225,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void shouldGetAll() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -243,7 +244,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void shouldFetchAllInTimeRange() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -272,7 +273,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testFetchRange() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -320,7 +321,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testPutAndFetchBefore() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -366,7 +367,7 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testPutAndFetchAfter() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         final long startTime = segmentSize - 4L;
 
         putFirstBatch(windowStore, startTime, context);
@@ -415,7 +416,7 @@ public class RocksDBWindowStoreTest {
         windowStore = createWindowStore(context, true);
         final long startTime = segmentSize - 4L;
 
-        context.setRecordContext(createRecordContext(startTime));
+        setCurrentTime(startTime);
         windowStore.put(0, "zero");
 
         assertEquals(Utils.mkList("zero"), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
@@ -440,21 +441,20 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testRolling() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
 
         // to validate segments
-        final Segments segments = new Segments(windowName, retentionPeriod, numSegments);
         final long startTime = segmentSize * 2;
         final long increment = segmentSize / 2;
-        context.setRecordContext(createRecordContext(startTime));
+        setCurrentTime(startTime);
         windowStore.put(0, "zero");
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
-        context.setRecordContext(createRecordContext(startTime + increment));
+        setCurrentTime(startTime + increment);
         windowStore.put(1, "one");
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
-        context.setRecordContext(createRecordContext(startTime + increment * 2));
+        setCurrentTime(startTime + increment * 2);
         windowStore.put(2, "two");
         assertEquals(
             Utils.mkSet(
@@ -464,7 +464,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        context.setRecordContext(createRecordContext(startTime + increment * 4));
+        setCurrentTime(startTime + increment * 4);
         windowStore.put(4, "four");
         assertEquals(
             Utils.mkSet(
@@ -476,7 +476,7 @@ public class RocksDBWindowStoreTest {
         );
 
 
-        context.setRecordContext(createRecordContext(startTime + increment * 5));
+        setCurrentTime(startTime + increment * 5);
         windowStore.put(5, "five");
         assertEquals(
             Utils.mkSet(
@@ -494,7 +494,7 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkList("four"), toList(windowStore.fetch(4, startTime + increment * 4 - windowSize, startTime + increment * 4 + windowSize)));
         assertEquals(Utils.mkList("five"), toList(windowStore.fetch(5, startTime + increment * 5 - windowSize, startTime + increment * 5 + windowSize)));
 
-        context.setRecordContext(createRecordContext(startTime + increment * 6));
+        setCurrentTime(startTime + increment * 6);
         windowStore.put(6, "six");
         assertEquals(
             Utils.mkSet(
@@ -515,7 +515,7 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + increment * 6 - windowSize, startTime + increment * 6 + windowSize)));
 
 
-        context.setRecordContext(createRecordContext(startTime + increment * 7));
+        setCurrentTime(startTime + increment * 7);
         windowStore.put(7, "seven");
         assertEquals(
             Utils.mkSet(
@@ -535,7 +535,7 @@ public class RocksDBWindowStoreTest {
         assertEquals(Utils.mkList("six"), toList(windowStore.fetch(6, startTime + increment * 6 - windowSize, startTime + increment * 6 + windowSize)));
         assertEquals(Utils.mkList("seven"), toList(windowStore.fetch(7, startTime + increment * 7 - windowSize, startTime + increment * 7 + windowSize)));
 
-        context.setRecordContext(createRecordContext(startTime + increment * 8));
+        setCurrentTime(startTime + increment * 8);
         windowStore.put(8, "eight");
         assertEquals(
             Utils.mkSet(
@@ -576,24 +576,24 @@ public class RocksDBWindowStoreTest {
         final long startTime = segmentSize * 2;
         final long increment = segmentSize / 2;
 
-        windowStore = createWindowStore(context);
-        context.setRecordContext(createRecordContext(startTime));
+        windowStore = createWindowStore(context, false);
+        setCurrentTime(startTime);
         windowStore.put(0, "zero");
-        context.setRecordContext(createRecordContext(startTime + increment));
+        setCurrentTime(startTime + increment);
         windowStore.put(1, "one");
-        context.setRecordContext(createRecordContext(startTime + increment * 2));
+        setCurrentTime(startTime + increment * 2);
         windowStore.put(2, "two");
-        context.setRecordContext(createRecordContext(startTime + increment * 3));
+        setCurrentTime(startTime + increment * 3);
         windowStore.put(3, "three");
-        context.setRecordContext(createRecordContext(startTime + increment * 4));
+        setCurrentTime(startTime + increment * 4);
         windowStore.put(4, "four");
-        context.setRecordContext(createRecordContext(startTime + increment * 5));
+        setCurrentTime(startTime + increment * 5);
         windowStore.put(5, "five");
-        context.setRecordContext(createRecordContext(startTime + increment * 6));
+        setCurrentTime(startTime + increment * 6);
         windowStore.put(6, "six");
-        context.setRecordContext(createRecordContext(startTime + increment * 7));
+        setCurrentTime(startTime + increment * 7);
         windowStore.put(7, "seven");
-        context.setRecordContext(createRecordContext(startTime + increment * 8));
+        setCurrentTime(startTime + increment * 8);
         windowStore.put(8, "eight");
         windowStore.flush();
 
@@ -602,7 +602,7 @@ public class RocksDBWindowStoreTest {
         // remove local store image
         Utils.delete(baseDir);
 
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         assertEquals(Utils.mkList(), toList(windowStore.fetch(0, startTime - windowSize, startTime + windowSize)));
         assertEquals(Utils.mkList(), toList(windowStore.fetch(1, startTime + increment - windowSize, startTime + increment + windowSize)));
         assertEquals(Utils.mkList(), toList(windowStore.fetch(2, startTime + increment * 2 - windowSize, startTime + increment * 2 + windowSize)));
@@ -637,14 +637,14 @@ public class RocksDBWindowStoreTest {
     public void testSegmentMaintenance() {
         windowStore = createWindowStore(context, true);
         context.setTime(0L);
-        context.setRecordContext(createRecordContext(0));
+        setCurrentTime(0);
         windowStore.put(0, "v");
         assertEquals(
             Utils.mkSet(segments.segmentName(0L)),
             segmentDirs(baseDir)
         );
 
-        context.setRecordContext(createRecordContext(59999));
+        setCurrentTime(59999);
         windowStore.put(0, "v");
         windowStore.put(0, "v");
         assertEquals(
@@ -652,7 +652,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        context.setRecordContext(createRecordContext(60000));
+        setCurrentTime(60000);
         windowStore.put(0, "v");
         assertEquals(
             Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
@@ -675,7 +675,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        context.setRecordContext(createRecordContext(180000));
+        setCurrentTime(180000);
         windowStore.put(0, "v");
 
         iter = windowStore.fetch(0, 0L, 240000L);
@@ -691,7 +691,7 @@ public class RocksDBWindowStoreTest {
             segmentDirs(baseDir)
         );
 
-        context.setRecordContext(createRecordContext(300000));
+        setCurrentTime(300000);
         windowStore.put(0, "v");
 
         iter = windowStore.fetch(0, 240000L, 1000000L);
@@ -714,7 +714,7 @@ public class RocksDBWindowStoreTest {
     public void testInitialLoading() {
         final File storeDir = new File(baseDir, windowName);
 
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
 
         new File(storeDir, segments.segmentName(0L)).mkdir();
         new File(storeDir, segments.segmentName(1L)).mkdir();
@@ -725,12 +725,16 @@ public class RocksDBWindowStoreTest {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        windowStore = createWindowStore(context);
+        context.setStreamTime(segmentSize * 6L);
+        windowStore = createWindowStore(context, false);
 
-        assertEquals(
-            Utils.mkSet(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)),
-            segmentDirs(baseDir)
-        );
+        final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
+        expected.sort(String::compareTo);
+
+        final List<String> actual = Utils.toList(segmentDirs(baseDir).iterator());
+        actual.sort(String::compareTo);
+
+        assertEquals(expected, actual);
 
         try (final WindowStoreIterator iter = windowStore.fetch(0, 0L, 1000000L)) {
             while (iter.hasNext()) {
@@ -746,8 +750,8 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
-        windowStore = createWindowStore(context);
-        context.setRecordContext(createRecordContext(0));
+        windowStore = createWindowStore(context, false);
+        setCurrentTime(0);
         windowStore.put(1, "one", 1L);
         windowStore.put(1, "two", 2L);
         windowStore.put(1, "three", 3L);
@@ -802,31 +806,31 @@ public class RocksDBWindowStoreTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         windowStore.put(null, "anyValue");
     }
 
     @Test
     public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         windowStore.put(1, null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnGetNullKey() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         windowStore.fetch(null, 1L, 2L);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         windowStore.fetch(null, 2, 1L, 2L);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
-        windowStore = createWindowStore(context);
+        windowStore = createWindowStore(context, false);
         windowStore.fetch(1, null, 1L, 2L);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index bfa317d..1fc0853 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -46,7 +46,7 @@ public class SegmentsTest {
     private static final int NUM_SEGMENTS = 5;
     private InternalMockProcessorContext context;
     private Segments segments;
-    private long segmentInterval;
+    private final long segmentInterval = 60_000L;
     private File stateDirectory;
     private String storeName = "test";
     private final int retentionPeriod =  4 * 60 * 1000;
@@ -59,8 +59,7 @@ public class SegmentsTest {
                                            Serdes.Long(),
                                            new NoOpRecordCollector(),
                                            new ThreadCache(new LogContext("testCache "), 0, new MockStreamsMetrics(new Metrics())));
-        segments = new Segments(storeName, retentionPeriod, NUM_SEGMENTS);
-        segmentInterval = Segments.segmentInterval(retentionPeriod, NUM_SEGMENTS);
+        segments = new Segments(storeName, retentionPeriod, segmentInterval);
     }
 
     @After
@@ -78,7 +77,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final Segments segments = new Segments("test", 8 * 60 * 1000, 5);
+        final Segments segments = new Segments("test", 8 * 60 * 1000, 120_000);
         assertEquals(0, segments.segmentId(0));
         assertEquals(0, segments.segmentId(60000));
         assertEquals(1, segments.segmentId(120000));
@@ -93,9 +92,9 @@ public class SegmentsTest {
 
     @Test
     public void shouldCreateSegments() {
-        final Segment segment1 = segments.getOrCreateSegment(0, context);
-        final Segment segment2 = segments.getOrCreateSegment(1, context);
-        final Segment segment3 = segments.getOrCreateSegment(2, context);
+        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
+        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
+        final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
         assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + segmentInterval).isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + 2 * segmentInterval).isDirectory());
@@ -106,16 +105,17 @@ public class SegmentsTest {
 
     @Test
     public void shouldNotCreateSegmentThatIsAlreadyExpired() {
-        segments.getOrCreateSegment(7, context);
-        assertNull(segments.getOrCreateSegment(0, context));
+        updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context));
         assertFalse(new File(context.stateDir(), "test/test.0").exists());
     }
 
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
-        final Segment segment1 = segments.getOrCreateSegment(0, context);
-        final Segment segment2 = segments.getOrCreateSegment(1, context);
-        final Segment segment3 = segments.getOrCreateSegment(7, context);
+        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
+        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
+        context.setStreamTime(segmentInterval * 7);
+        final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
@@ -126,22 +126,22 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentForTimestamp() {
-        final Segment segment = segments.getOrCreateSegment(0, context);
-        segments.getOrCreateSegment(1, context);
+        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
+        segments.getOrCreateSegmentIfLive(1, context);
         assertEquals(segment, segments.getSegmentForTimestamp(0L));
     }
 
     @Test
     public void shouldGetCorrectSegmentString() {
-        final Segment segment = segments.getOrCreateSegment(0, context);
+        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
         assertEquals("Segment(id=0, name=test.0)", segment.toString());
     }
 
     @Test
     public void shouldCloseAllOpenSegments() {
-        final Segment first = segments.getOrCreateSegment(0, context);
-        final Segment second = segments.getOrCreateSegment(1, context);
-        final Segment third = segments.getOrCreateSegment(2, context);
+        final Segment first = segments.getOrCreateSegmentIfLive(0, context);
+        final Segment second = segments.getOrCreateSegmentIfLive(1, context);
+        final Segment third = segments.getOrCreateSegmentIfLive(2, context);
         segments.close();
 
         assertFalse(first.isOpen());
@@ -151,15 +151,15 @@ public class SegmentsTest {
 
     @Test
     public void shouldOpenExistingSegments() {
-        segments.getOrCreateSegment(0, context);
-        segments.getOrCreateSegment(1, context);
-        segments.getOrCreateSegment(2, context);
-        segments.getOrCreateSegment(3, context);
-        segments.getOrCreateSegment(4, context);
+        segments.getOrCreateSegmentIfLive(0, context);
+        segments.getOrCreateSegmentIfLive(1, context);
+        segments.getOrCreateSegmentIfLive(2, context);
+        segments.getOrCreateSegmentIfLive(3, context);
+        segments.getOrCreateSegmentIfLive(4, context);
         // close existing.
         segments.close();
 
-        segments = new Segments("test", 4 * 60 * 1000, 5);
+        segments = new Segments("test", 4 * 60 * 1000, 60_000);
         segments.openExisting(context);
 
         assertTrue(segments.getSegmentForTimestamp(0).isOpen());
@@ -171,11 +171,16 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentsWithinTimeRange() {
-        segments.getOrCreateSegment(0, context);
-        segments.getOrCreateSegment(1, context);
-        segments.getOrCreateSegment(2, context);
-        segments.getOrCreateSegment(3, context);
-        segments.getOrCreateSegment(4, context);
+        updateStreamTimeAndCreateSegment(0);
+        updateStreamTimeAndCreateSegment(1);
+        updateStreamTimeAndCreateSegment(2);
+        updateStreamTimeAndCreateSegment(3);
+        updateStreamTimeAndCreateSegment(4);
+        segments.getOrCreateSegmentIfLive(0, context);
+        segments.getOrCreateSegmentIfLive(1, context);
+        segments.getOrCreateSegmentIfLive(2, context);
+        segments.getOrCreateSegmentIfLive(3, context);
+        segments.getOrCreateSegmentIfLive(4, context);
 
         final List<Segment> segments = this.segments.segments(0, 2 * 60 * 1000);
         assertEquals(3, segments.size());
@@ -186,11 +191,11 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentsWithinTimeRangeOutOfOrder() throws Exception {
-        segments.getOrCreateSegment(4, context);
-        segments.getOrCreateSegment(2, context);
-        segments.getOrCreateSegment(0, context);
-        segments.getOrCreateSegment(1, context);
-        segments.getOrCreateSegment(3, context);
+        updateStreamTimeAndCreateSegment(4);
+        updateStreamTimeAndCreateSegment(2);
+        updateStreamTimeAndCreateSegment(0);
+        updateStreamTimeAndCreateSegment(1);
+        updateStreamTimeAndCreateSegment(3);
 
         final List<Segment> segments = this.segments.segments(0, 2 * 60 * 1000);
         assertEquals(3, segments.size());
@@ -201,23 +206,46 @@ public class SegmentsTest {
 
     @Test
     public void shouldRollSegments() {
-        segments.getOrCreateSegment(0, context);
+        updateStreamTimeAndCreateSegment(0);
         verifyCorrectSegments(0, 1);
-        segments.getOrCreateSegment(1, context);
+        updateStreamTimeAndCreateSegment(1);
         verifyCorrectSegments(0, 2);
-        segments.getOrCreateSegment(2, context);
+        updateStreamTimeAndCreateSegment(2);
         verifyCorrectSegments(0, 3);
-        segments.getOrCreateSegment(3, context);
+        updateStreamTimeAndCreateSegment(3);
         verifyCorrectSegments(0, 4);
-        segments.getOrCreateSegment(4, context);
+        updateStreamTimeAndCreateSegment(4);
         verifyCorrectSegments(0, 5);
-        segments.getOrCreateSegment(5, context);
+        updateStreamTimeAndCreateSegment(5);
         verifyCorrectSegments(1, 5);
-        segments.getOrCreateSegment(6, context);
+        updateStreamTimeAndCreateSegment(6);
         verifyCorrectSegments(2, 5);
     }
 
     @Test
+    public void futureEventsShouldNotCauseSegmentRoll() {
+        updateStreamTimeAndCreateSegment(0);
+        verifyCorrectSegments(0, 1);
+        updateStreamTimeAndCreateSegment(1);
+        verifyCorrectSegments(0, 2);
+        updateStreamTimeAndCreateSegment(2);
+        verifyCorrectSegments(0, 3);
+        updateStreamTimeAndCreateSegment(3);
+        verifyCorrectSegments(0, 4);
+        updateStreamTimeAndCreateSegment(4);
+        verifyCorrectSegments(0, 5);
+        segments.getOrCreateSegmentIfLive(5, context);
+        verifyCorrectSegments(0, 6);
+        segments.getOrCreateSegmentIfLive(6, context);
+        verifyCorrectSegments(0, 7);
+    }
+
+    private void updateStreamTimeAndCreateSegment(final int segment) {
+        context.setStreamTime(segmentInterval * segment);
+        segments.getOrCreateSegmentIfLive(segment, context);
+    }
+
+    @Test
     public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exception {
         final String storeDirectoryPath = stateDirectory.getAbsolutePath() + File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
@@ -260,7 +288,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldClearSegmentsOnClose() {
-        segments.getOrCreateSegment(0, context);
+        segments.getOrCreateSegmentIfLive(0, context);
         segments.close();
         assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index bb42d1c..3251489 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -61,6 +61,7 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     private Serde<?> keySerde;
     private Serde<?> valSerde;
     private long timestamp = -1L;
+    private long streamTime = -1;
 
     public InternalMockProcessorContext() {
         this(null,
@@ -179,6 +180,15 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     @Override
     public void initialized() {}
 
+    public void setStreamTime(final long time) {
+        streamTime = time;
+    }
+
+    @Override
+    public Long streamTime() {
+        return streamTime;
+    }
+
     @Override
     public File stateDir() {
         if (stateDir == null) {
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 92f84c5..7b695c4 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -86,6 +86,11 @@ public class NoOpProcessorContext extends AbstractProcessorContext {
     }
 
     @Override
+    public Long streamTime() {
+        throw new RuntimeException("not implemented");
+    }
+
+    @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
         // no-op