You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/07/02 23:07:43 UTC

[kafka] branch trunk updated: KAFKA-7080: replace numSegments with segmentInterval (#5257)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64fff8b  KAFKA-7080: replace numSegments with segmentInterval (#5257)
64fff8b is described below

commit 64fff8bfcc9b92769640bfaa692e19d0db8861a6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Jul 2 18:07:38 2018 -0500

    KAFKA-7080: replace numSegments with segmentInterval (#5257)
    
    See also KIP-319.
    
    Replace number-of-segments parameters with segment-interval-ms parameters in various places. The latter was always the parameter that several components needed, and we accidentally supplied the former because it was the one available.
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../kafka/streams/kstream/SessionWindows.java      | 12 ++---
 .../org/apache/kafka/streams/kstream/Windows.java  | 29 ++++++----
 .../streams/kstream/internals/KStreamImpl.java     | 17 +++---
 .../kstream/internals/TimeWindowedKStreamImpl.java | 12 +++--
 .../org/apache/kafka/streams/state/Stores.java     | 61 +++++++++++++++++++---
 .../streams/state/WindowBytesStoreSupplier.java    | 13 ++++-
 .../RocksDbSessionBytesStoreSupplier.java          |  1 +
 .../internals/RocksDbWindowBytesStoreSupplier.java |  6 +++
 .../state/internals/WindowStoreBuilder.java        |  2 +-
 .../kafka/streams/kstream/JoinWindowsTest.java     | 10 ----
 .../kafka/streams/kstream/SessionWindowsTest.java  |  2 +-
 .../kafka/streams/kstream/TimeWindowsTest.java     |  2 +-
 .../apache/kafka/streams/kstream/WindowsTest.java  |  8 ++-
 .../apache/kafka/streams/perf/SimpleBenchmark.java | 17 +++---
 .../internals/InternalTopologyBuilderTest.java     | 10 +++-
 .../org/apache/kafka/streams/state/StoresTest.java | 29 +++++-----
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  4 +-
 .../state/internals/RocksDBSessionStoreTest.java   |  2 +-
 .../state/internals/RocksDBWindowStoreTest.java    | 60 ++++++++-------------
 .../StreamThreadStateStoreProviderTest.java        |  8 ++-
 20 files changed, 190 insertions(+), 115 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index aa3dec1..fc1fb9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -66,11 +66,11 @@ import java.util.Objects;
 public final class SessionWindows {
 
     private final long gapMs;
-    private long maintainDurationMs;
+    private final long maintainDurationMs;
 
-    private SessionWindows(final long gapMs) {
+    private SessionWindows(final long gapMs, final long maintainDurationMs) {
         this.gapMs = gapMs;
-        maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        this.maintainDurationMs = maintainDurationMs;
     }
 
     /**
@@ -85,7 +85,8 @@ public final class SessionWindows {
         if (inactivityGapMs <= 0) {
             throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
         }
-        return new SessionWindows(inactivityGapMs);
+        final long oneDayMs = 24 * 60 * 60_000L;
+        return new SessionWindows(inactivityGapMs, oneDayMs);
     }
 
     /**
@@ -99,9 +100,8 @@ public final class SessionWindows {
         if (durationMs < gapMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
         }
-        maintainDurationMs = durationMs;
 
-        return this;
+        return new SessionWindows(gapMs, durationMs);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 09fdfce..53ead1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -36,18 +36,10 @@ import java.util.Map;
  */
 public abstract class Windows<W extends Window> {
 
-    private static final int DEFAULT_NUM_SEGMENTS = 3;
+    private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
+    @Deprecated public int segments = 3;
 
-    static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day
-
-    private long maintainDurationMs;
-
-    public int segments;
-
-    protected Windows() {
-        segments = DEFAULT_NUM_SEGMENTS;
-        maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
-    }
+    protected Windows() {}
 
     /**
      * Set the window maintain duration (retention time) in milliseconds.
@@ -77,13 +69,28 @@ public abstract class Windows<W extends Window> {
     }
 
     /**
+     * Return the segment interval in milliseconds.
+     *
+     * @return the segment interval
+     */
+    @SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later.
+    public long segmentInterval() {
+        // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
+        final long minimumSegmentInterval = 60_000L;
+        // Scaled to the (possibly overridden) retention period
+        return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
+    }
+
+    /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers that extend this class.
      *
      * @param segments the number of segments to be used
      * @return itself
      * @throws IllegalArgumentException if specified segments is small than 2
+     * @deprecated since 2.1 Override segmentInterval() instead.
      */
+    @Deprecated
     protected Windows<W> segments(final int segments) throws IllegalArgumentException {
         if (segments < 2) {
             throw new IllegalArgumentException("Number of segments must be at least 2.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bc56a3d..acfdf35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -844,12 +844,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                    final Serde<K> keySerde,
                                                                                    final Serde<V> valueSerde,
                                                                                    final String storeName) {
-        return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName,
-                                                                      windows.maintainMs(),
-                                                                      windows.segments,
-                                                                      windows.size(),
-                                                                      true), keySerde, valueSerde);
-
+        return Stores.windowStoreBuilder(
+            Stores.persistentWindowStore(
+                storeName,
+                windows.maintainMs(),
+                windows.size(),
+                true,
+                windows.segmentInterval()
+            ),
+            keySerde,
+            valueSerde
+        );
     }
 
     private class KStreamImplJoin {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index e545f48..7d6d174 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -155,11 +155,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
     private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentWindowStore(materialized.storeName(),
-                                                    windows.maintainMs(),
-                                                    windows.segments,
-                                                    windows.size(),
-                                                    false);
+            supplier = Stores.persistentWindowStore(
+                materialized.storeName(),
+                windows.maintainMs(),
+                windows.size(),
+                false,
+                windows.segmentInterval()
+            );
         }
         final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
                                                                                    materialized.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index eebd59f..c1b81c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -79,7 +79,7 @@ public class Stores {
     /**
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
      * @param name  name of the store (cannot be {@code null})
-     * @return  an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
      * to build a persistent store
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
@@ -90,7 +90,7 @@ public class Stores {
     /**
      * Create an in-memory {@link KeyValueBytesStoreSupplier}.
      * @param name  name of the store (cannot be {@code null})
-     * @return  an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
      * build an in-memory store
      */
     public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
@@ -151,25 +151,72 @@ public class Stores {
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
+     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
      */
+    @Deprecated
     public static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                                  final long retentionPeriod,
                                                                  final int numSegments,
                                                                  final long windowSize,
                                                                  final boolean retainDuplicates) {
+        if (numSegments < 2) {
+            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
+        }
+
+        final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+
+        return persistentWindowStore(
+            name,
+            retentionPeriod,
+            windowSize,
+            retainDuplicates,
+            legacySegmentInterval
+        );
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     */
+    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                 final long retentionPeriod,
+                                                                 final long windowSize,
+                                                                 final boolean retainDuplicates) {
+        // we're arbitrarily defaulting to segments no smaller than one minute.
+        final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
+        return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param segmentInterval       size of segments in ms (must be at least one minute)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     */
+    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                 final long retentionPeriod,
+                                                                 final long windowSize,
+                                                                 final boolean retainDuplicates,
+                                                                 final long segmentInterval) {
         Objects.requireNonNull(name, "name cannot be null");
         if (retentionPeriod < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
         }
-        if (numSegments < 2) {
-            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
-        }
         if (windowSize < 0) {
             throw new IllegalArgumentException("windowSize cannot be negative");
         }
-        final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+        if (segmentInterval < 60_000) {
+            throw new IllegalArgumentException("segmentInterval must be at least one minute");
+        }
 
-        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates);
+        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index 9cf70c2..c071b34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -33,11 +33,22 @@ public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Byte
      * It is also used to reduce the amount of data that is scanned when caching is enabled.
      *
      * @return number of segments
+     * @deprecated since 2.1. Use {@link WindowBytesStoreSupplier#segmentIntervalMs()} instead.
      */
+    @Deprecated
     int segments();
 
     /**
-     * The size of the windows any store created from this supplier is creating.
+     * The size of the segments (in milliseconds) the store has.
+     * If your store is segmented then this should be the size of segments in the underlying store.
+     * It is also used to reduce the amount of data that is scanned when caching is enabled.
+     *
+     * @return size of the segments (in milliseconds)
+     */
+    long segmentIntervalMs();
+
+    /**
+     * The size of the windows (in milliseconds) any store created from this supplier is creating.
      *
      * @return window size
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 37968ce..45df39c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -53,6 +53,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
 
     @Override
     public long segmentIntervalMs() {
+        // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
         return Math.max(retentionPeriod / 2, 60_000L);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 9421bcb..5c7b099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -66,12 +66,18 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
         return "rocksdb-window-state";
     }
 
+    @Deprecated
     @Override
     public int segments() {
         return (int) (retentionPeriod / segmentInterval) + 1;
     }
 
     @Override
+    public long segmentIntervalMs() {
+        return segmentInterval;
+    }
+
+    @Override
     public long windowSize() {
         return windowSize;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index 97b4883..31d063a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -52,7 +52,7 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
                                         keySerde,
                                         valueSerde,
                                         storeSupplier.windowSize(),
-                                        storeSupplier.segments());
+                                        storeSupplier.segmentIntervalMs());
     }
 
     private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 0611704..7b22df1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -113,16 +113,6 @@ public class JoinWindowsTest {
     }
 
     @Test
-    public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() {
-        final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS;
-
-        final JoinWindows windowSpec = JoinWindows.of(size);
-        final long windowSize = windowSpec.size();
-
-        assertEquals(windowSize, windowSpec.maintainMs());
-    }
-
-    @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
         final JoinWindows windowSpec = JoinWindows.of(anySize);
         final long windowSize = windowSpec.size();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 8c0a0b9..d0e5996 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -50,7 +50,7 @@ public class SessionWindowsTest {
 
     @Test
     public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
-        final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        final long windowGap = 2 * SessionWindows.with(1).maintainMs();
         assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 7e6bb3e..390678f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -47,7 +47,7 @@ public class TimeWindowsTest {
 
     @Test
     public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
-        final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        final long windowSize = 2 * TimeWindows.of(1).maintainMs();
         assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index 77faf1a..2e9246e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -40,7 +40,13 @@ public class WindowsTest {
     @Test
     public void shouldSetNumberOfSegments() {
         final int anySegmentSizeLargerThanOne = 5;
-        assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments);
+        final TestWindows testWindow = new TestWindows();
+        final long maintainMs = testWindow.maintainMs();
+
+        assertEquals(
+            maintainMs / (anySegmentSizeLargerThanOne - 1),
+            testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
+        );
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7179293..654fd03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -32,10 +32,10 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -469,13 +469,18 @@ public class SimpleBenchmark {
         setStreamProperties("simple-benchmark-streams-with-store");
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
-                = Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
+
+        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
+            Stores.persistentWindowStore(
+                "store",
                 AGGREGATE_WINDOW_SIZE * 3,
-                3,
                 AGGREGATE_WINDOW_SIZE,
-                false),
-                INTEGER_SERDE, BYTE_SERDE);
+                false,
+                60_000L
+            ),
+            INTEGER_SERDE,
+            BYTE_SERDE
+        );
         builder.addStateStore(storeBuilder.withCachingEnabled());
 
         final KStream<Integer, byte[]> source = builder.stream(topic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b0674ea..fb64130 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -532,7 +532,15 @@ public class InternalTopologyBuilderTest {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
+
+        builder.addStateStore(
+            Stores.windowStoreBuilder(
+                Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
+                Serdes.String(),
+                Serdes.String()
+            ),
+            "processor"
+        );
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 5383c27..23f246d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -28,7 +28,6 @@ import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.fail;
 
 public class StoresTest {
 
@@ -54,22 +53,28 @@ public class StoresTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
-        Stores.persistentWindowStore(null, 0, 1, 0, false);
+        Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
-        Stores.persistentWindowStore("anyName", -1, 1, 0, false);
+        Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
     }
 
+    @Deprecated
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
-        Stores.persistentWindowStore("anyName", 0, 0, 0, false);
+        Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
-        Stores.persistentWindowStore("anyName", 0, 1, -1, false);
+        Stores.persistentWindowStore("anyName", 0L, -1L, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
+        Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
     }
 
     @Test(expected = NullPointerException.class)
@@ -98,16 +103,6 @@ public class StoresTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() {
-        try {
-            Stores.persistentWindowStore("store", 1, 1, 1, false);
-            fail("Should have thrown illegal argument exception as number of segments is less than 2");
-        } catch (final IllegalArgumentException e) {
-         // ok
-        }
-    }
-
-    @Test
     public void shouldCreateInMemoryKeyValueStore() {
         assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
     }
@@ -124,7 +119,7 @@ public class StoresTest {
 
     @Test
     public void shouldCreateRocksDbWindowStore() {
-        assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
+        assertThat(Stores.persistentWindowStore("store", 1L, 1L, false).get(), instanceOf(RocksDBWindowStore.class));
     }
 
     @Test
@@ -134,7 +129,7 @@ public class StoresTest {
 
     @Test
     public void shouldBuildWindowStore() {
-        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
                                                                       Serdes.String(),
                                                                       Serdes.String()).build();
         assertThat(store, not(nullValue()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d7a7283..6b9e7a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -244,7 +244,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                 retention,
-                numSegments,
+                segmentInterval,
                 schema);
 
         bytesStore.init(context, bytesStore);
@@ -271,7 +271,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                 retention,
-                numSegments,
+                segmentInterval,
                 schema);
 
         bytesStore.init(context, bytesStore);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index c95cbba..b44d369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,7 +53,7 @@ public class RocksDBSessionStoreTest {
         schema.init("topic");
 
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema);
+                new RocksDBSegmentedBytesStore("session-store", 10_000L, 60_000L, schema);
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 2a84a7b..ac481a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -69,9 +69,9 @@ public class RocksDBWindowStoreTest {
     private final int numSegments = 3;
     private final long windowSize = 3L;
     private final String windowName = "window";
-    private final long segmentSize = 60_000;
-    private final long retentionPeriod = segmentSize * (numSegments - 1);
-    private final Segments segments = new Segments(windowName, retentionPeriod, segmentSize);
+    private final long segmentInterval = 60_000;
+    private final long retentionPeriod = segmentInterval * (numSegments - 1);
+    private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
@@ -107,12 +107,7 @@ public class RocksDBWindowStoreTest {
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
         final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                retentionPeriod,
-                numSegments,
-                windowSize,
-                retainDuplicates),
+            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval),
             Serdes.Integer(),
             Serdes.String()).build();
 
@@ -134,10 +129,10 @@ public class RocksDBWindowStoreTest {
         setCurrentTime(currentTime);
         windowStore.put(1, "one");
 
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
         setCurrentTime(currentTime);
         windowStore.put(1, "two");
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
 
         setCurrentTime(currentTime);
         windowStore.put(1, "three");
@@ -145,7 +140,7 @@ public class RocksDBWindowStoreTest {
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
 
         // roll to the next segment that will close the first
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
         setCurrentTime(currentTime);
         windowStore.put(1, "four");
 
@@ -167,7 +162,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void testRangeAndSinglePointFetch() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -226,7 +221,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldGetAll() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -245,7 +240,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldFetchAllInTimeRange() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -274,7 +269,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void testFetchRange() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -322,7 +317,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void testPutAndFetchBefore() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -368,7 +363,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void testPutAndFetchAfter() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -414,7 +409,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void testPutSameKeyTimestamp() {
         windowStore = createWindowStore(context, true);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         setCurrentTime(startTime);
         windowStore.put(0, "zero");
@@ -444,8 +439,8 @@ public class RocksDBWindowStoreTest {
         windowStore = createWindowStore(context, false);
 
         // to validate segments
-        final long startTime = segmentSize * 2;
-        final long increment = segmentSize / 2;
+        final long startTime = segmentInterval * 2;
+        final long increment = segmentInterval / 2;
         setCurrentTime(startTime);
         windowStore.put(0, "zero");
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
@@ -573,8 +568,8 @@ public class RocksDBWindowStoreTest {
 
     @Test
     public void testRestore() throws IOException {
-        final long startTime = segmentSize * 2;
-        final long increment = segmentSize / 2;
+        final long startTime = segmentInterval * 2;
+        final long increment = segmentInterval / 2;
 
         windowStore = createWindowStore(context, false);
         setCurrentTime(startTime);
@@ -725,7 +720,7 @@ public class RocksDBWindowStoreTest {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        context.setStreamTime(segmentSize * 6L);
+        context.setStreamTime(segmentInterval * 6L);
         windowStore = createWindowStore(context, false);
 
         final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
@@ -768,13 +763,9 @@ public class RocksDBWindowStoreTest {
     public void shouldFetchAndIterateOverExactKeys() {
         final long windowSize = 0x7a00000000000000L;
         final long retentionPeriod = 0x7a00000000000000L;
+
         final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                retentionPeriod,
-                2,
-                windowSize,
-                true),
+            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true),
             Serdes.String(),
             Serdes.String()).build();
 
@@ -837,7 +828,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
         windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()),
+            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, segmentInterval, new WindowKeySchema()),
             Serdes.Integer(),
             new SerdeThatDoesntHandleNull(),
             false,
@@ -850,12 +841,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
         final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                60000,
-                2,
-                60000,
-                true),
+            Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true),
             Serdes.Bytes(),
             Serdes.String()).build();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 66ea3c4..4916cb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -76,7 +76,13 @@ public class StreamThreadStateStoreProviderTest {
         topology.addSource("the-source", topicName);
         topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
         topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor");
-        topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
+        topology.addStateStore(
+            Stores.windowStoreBuilder(
+                Stores.persistentWindowStore("window-store", 10L, 2L, false),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor"
+        );
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";