You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/02 23:08:00 UTC

[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore

    [ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530555#comment-16530555 ] 

ASF GitHub Bot commented on KAFKA-7080:
---------------------------------------

guozhangwang closed pull request #5257: KAFKA-7080: replace numSegments with segmentInterval
URL: https://github.com/apache/kafka/pull/5257
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index aa3dec17239..fc1fb9f5d13 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -66,11 +66,11 @@
 public final class SessionWindows {
 
     private final long gapMs;
-    private long maintainDurationMs;
+    private final long maintainDurationMs;
 
-    private SessionWindows(final long gapMs) {
+    private SessionWindows(final long gapMs, final long maintainDurationMs) {
         this.gapMs = gapMs;
-        maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        this.maintainDurationMs = maintainDurationMs;
     }
 
     /**
@@ -85,7 +85,8 @@ public static SessionWindows with(final long inactivityGapMs) {
         if (inactivityGapMs <= 0) {
             throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
         }
-        return new SessionWindows(inactivityGapMs);
+        final long oneDayMs = 24 * 60 * 60_000L;
+        return new SessionWindows(inactivityGapMs, oneDayMs);
     }
 
     /**
@@ -99,9 +100,8 @@ public SessionWindows until(final long durationMs) throws IllegalArgumentExcepti
         if (durationMs < gapMs) {
             throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
         }
-        maintainDurationMs = durationMs;
 
-        return this;
+        return new SessionWindows(gapMs, durationMs);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 09fdfce948d..53ead1e9b73 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -36,18 +36,10 @@
  */
 public abstract class Windows<W extends Window> {
 
-    private static final int DEFAULT_NUM_SEGMENTS = 3;
+    private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
+    @Deprecated public int segments = 3;
 
-    static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day
-
-    private long maintainDurationMs;
-
-    public int segments;
-
-    protected Windows() {
-        segments = DEFAULT_NUM_SEGMENTS;
-        maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
-    }
+    protected Windows() {}
 
     /**
      * Set the window maintain duration (retention time) in milliseconds.
@@ -76,6 +68,19 @@ public long maintainMs() {
         return maintainDurationMs;
     }
 
+    /**
+     * Return the segment interval in milliseconds.
+     *
+     * @return the segment interval
+     */
+    @SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later.
+    public long segmentInterval() {
+        // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
+        final long minimumSegmentInterval = 60_000L;
+        // Scaled to the (possibly overridden) retention period
+        return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
+    }
+
     /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers that extend this class.
@@ -83,7 +88,9 @@ public long maintainMs() {
      * @param segments the number of segments to be used
      * @return itself
      * @throws IllegalArgumentException if specified segments is small than 2
+     * @deprecated since 2.1 Override segmentInterval() instead.
      */
+    @Deprecated
     protected Windows<W> segments(final int segments) throws IllegalArgumentException {
         if (segments < 2) {
             throw new IllegalArgumentException("Number of segments must be at least 2.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bc56a3d0350..acfdf35b96a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -844,12 +844,17 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl
                                                                                    final Serde<K> keySerde,
                                                                                    final Serde<V> valueSerde,
                                                                                    final String storeName) {
-        return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName,
-                                                                      windows.maintainMs(),
-                                                                      windows.segments,
-                                                                      windows.size(),
-                                                                      true), keySerde, valueSerde);
-
+        return Stores.windowStoreBuilder(
+            Stores.persistentWindowStore(
+                storeName,
+                windows.maintainMs(),
+                windows.size(),
+                true,
+                windows.segmentInterval()
+            ),
+            keySerde,
+            valueSerde
+        );
     }
 
     private class KStreamImplJoin {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index e545f48e16a..7d6d174be5a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -155,11 +155,13 @@
     private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
         WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
         if (supplier == null) {
-            supplier = Stores.persistentWindowStore(materialized.storeName(),
-                                                    windows.maintainMs(),
-                                                    windows.segments,
-                                                    windows.size(),
-                                                    false);
+            supplier = Stores.persistentWindowStore(
+                materialized.storeName(),
+                windows.maintainMs(),
+                windows.size(),
+                false,
+                windows.segmentInterval()
+            );
         }
         final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
                                                                                    materialized.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index eebd59fb012..c1b81c66f37 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -79,7 +79,7 @@
     /**
      * Create a persistent {@link KeyValueBytesStoreSupplier}.
      * @param name  name of the store (cannot be {@code null})
-     * @return  an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
      * to build a persistent store
      */
     public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
@@ -90,7 +90,7 @@ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String na
     /**
      * Create an in-memory {@link KeyValueBytesStoreSupplier}.
      * @param name  name of the store (cannot be {@code null})
-     * @return  an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
+     * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
      * build an in-memory store
      */
     public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
@@ -151,25 +151,72 @@ public String metricsScope() {
      * @param windowSize            size of the windows (cannot be negative)
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
+     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
      */
+    @Deprecated
     public static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                                  final long retentionPeriod,
                                                                  final int numSegments,
                                                                  final long windowSize,
                                                                  final boolean retainDuplicates) {
+        if (numSegments < 2) {
+            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
+        }
+
+        final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+
+        return persistentWindowStore(
+            name,
+            retentionPeriod,
+            windowSize,
+            retainDuplicates,
+            legacySegmentInterval
+        );
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     */
+    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                 final long retentionPeriod,
+                                                                 final long windowSize,
+                                                                 final boolean retainDuplicates) {
+        // we're arbitrarily defaulting to segments no smaller than one minute.
+        final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
+        return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
+    }
+
+    /**
+     * Create a persistent {@link WindowBytesStoreSupplier}.
+     * @param name                  name of the store (cannot be {@code null})
+     * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
+     * @param segmentInterval       size of segments in ms (must be at least one minute)
+     * @param windowSize            size of the windows (cannot be negative)
+     * @param retainDuplicates      whether or not to retain duplicates.
+     * @return an instance of {@link WindowBytesStoreSupplier}
+     */
+    public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+                                                                 final long retentionPeriod,
+                                                                 final long windowSize,
+                                                                 final boolean retainDuplicates,
+                                                                 final long segmentInterval) {
         Objects.requireNonNull(name, "name cannot be null");
         if (retentionPeriod < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be negative");
         }
-        if (numSegments < 2) {
-            throw new IllegalArgumentException("numSegments cannot must smaller than 2");
-        }
         if (windowSize < 0) {
             throw new IllegalArgumentException("windowSize cannot be negative");
         }
-        final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+        if (segmentInterval < 60_000) {
+            throw new IllegalArgumentException("segmentInterval must be at least one minute");
+        }
 
-        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates);
+        return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index 9cf70c2f89c..c071b344ba3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -33,11 +33,22 @@
      * It is also used to reduce the amount of data that is scanned when caching is enabled.
      *
      * @return number of segments
+     * @deprecated since 2.1. Use {@link WindowBytesStoreSupplier#segmentIntervalMs()} instead.
      */
+    @Deprecated
     int segments();
 
     /**
-     * The size of the windows any store created from this supplier is creating.
+     * The size of the segments (in milliseconds) the store has.
+     * If your store is segmented then this should be the size of segments in the underlying store.
+     * It is also used to reduce the amount of data that is scanned when caching is enabled.
+     *
+     * @return size of the segments (in milliseconds)
+     */
+    long segmentIntervalMs();
+
+    /**
+     * The size of the windows (in milliseconds) any store created from this supplier is creating.
      *
      * @return window size
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index c83ab599f7c..55e6877923d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -53,6 +53,7 @@ public String metricsScope() {
 
     @Override
     public long segmentIntervalMs() {
+        // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
         return Math.max(retentionPeriod / 2, 60_000L);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 4a7bffc850e..0684ba677bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -66,11 +66,17 @@ public String metricsScope() {
         return "rocksdb-window";
     }
 
+    @Deprecated
     @Override
     public int segments() {
         return (int) (retentionPeriod / segmentInterval) + 1;
     }
 
+    @Override
+    public long segmentIntervalMs() {
+        return segmentInterval;
+    }
+
     @Override
     public long windowSize() {
         return windowSize;
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index 97b4883084c..31d063a1b28 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -52,7 +52,7 @@ public WindowStoreBuilder(final WindowBytesStoreSupplier storeSupplier,
                                         keySerde,
                                         valueSerde,
                                         storeSupplier.windowSize(),
-                                        storeSupplier.segments());
+                                        storeSupplier.segmentIntervalMs());
     }
 
     private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 0611704ee4e..7b22df10c6f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -112,16 +112,6 @@ public void untilShouldSetMaintainDuration() {
         assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
     }
 
-    @Test
-    public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() {
-        final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS;
-
-        final JoinWindows windowSpec = JoinWindows.of(size);
-        final long windowSize = windowSpec.size();
-
-        assertEquals(windowSize, windowSpec.maintainMs());
-    }
-
     @Test
     public void retentionTimeMustNoBeSmallerThanWindowSize() {
         final JoinWindows windowSpec = JoinWindows.of(anySize);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 8c0a0b99684..d0e5996a481 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -50,7 +50,7 @@ public void windowSizeMustNotBeZero() {
 
     @Test
     public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
-        final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        final long windowGap = 2 * SessionWindows.with(1).maintainMs();
         assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 7e6bb3e5580..390678f33b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -47,7 +47,7 @@ public void shouldSetWindowRetentionTime() {
 
     @Test
     public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
-        final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+        final long windowSize = 2 * TimeWindows.of(1).maintainMs();
         assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index 77faf1a1fe8..2e9246e046c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -40,7 +40,13 @@ public long size() {
     @Test
     public void shouldSetNumberOfSegments() {
         final int anySegmentSizeLargerThanOne = 5;
-        assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments);
+        final TestWindows testWindow = new TestWindows();
+        final long maintainMs = testWindow.maintainMs();
+
+        assertEquals(
+            maintainMs / (anySegmentSizeLargerThanOne - 1),
+            testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
+        );
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7179293200e..654fd03f10a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -32,10 +32,10 @@
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -469,13 +469,18 @@ private void processStreamWithWindowStore(final String topic) {
         setStreamProperties("simple-benchmark-streams-with-store");
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
-                = Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
+
+        final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
+            Stores.persistentWindowStore(
+                "store",
                 AGGREGATE_WINDOW_SIZE * 3,
-                3,
                 AGGREGATE_WINDOW_SIZE,
-                false),
-                INTEGER_SERDE, BYTE_SERDE);
+                false,
+                60_000L
+            ),
+            INTEGER_SERDE,
+            BYTE_SERDE
+        );
         builder.addStateStore(storeBuilder.withCachingEnabled());
 
         final KStream<Integer, byte[]> source = builder.stream(topic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b0674ea338a..fb641302560 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -532,7 +532,15 @@ public void shouldAddInternalTopicConfigForWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-        builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
+
+        builder.addStateStore(
+            Stores.windowStoreBuilder(
+                Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
+                Serdes.String(),
+                Serdes.String()
+            ),
+            "processor"
+        );
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 5383c27ac43..23f246d001e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -28,7 +28,6 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.fail;
 
 public class StoresTest {
 
@@ -54,22 +53,28 @@ public void shouldThrowIfILruMapStoreCapacityIsNegative() {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
-        Stores.persistentWindowStore(null, 0, 1, 0, false);
+        Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
-        Stores.persistentWindowStore("anyName", -1, 1, 0, false);
+        Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
     }
 
+    @Deprecated
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
-        Stores.persistentWindowStore("anyName", 0, 0, 0, false);
+        Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
-        Stores.persistentWindowStore("anyName", 0, 1, -1, false);
+        Stores.persistentWindowStore("anyName", 0L, -1L, false);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
+        Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
     }
 
     @Test(expected = NullPointerException.class)
@@ -97,16 +102,6 @@ public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
         Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
     }
 
-    @Test
-    public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() {
-        try {
-            Stores.persistentWindowStore("store", 1, 1, 1, false);
-            fail("Should have thrown illegal argument exception as number of segments is less than 2");
-        } catch (final IllegalArgumentException e) {
-         // ok
-        }
-    }
-
     @Test
     public void shouldCreateInMemoryKeyValueStore() {
         assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
@@ -124,7 +119,7 @@ public void shouldCreateRocksDbStore() {
 
     @Test
     public void shouldCreateRocksDbWindowStore() {
-        assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
+        assertThat(Stores.persistentWindowStore("store", 1L, 1L, false).get(), instanceOf(RocksDBWindowStore.class));
     }
 
     @Test
@@ -134,7 +129,7 @@ public void shouldCreateRocksDbSessionStore() {
 
     @Test
     public void shouldBuildWindowStore() {
-        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+        final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
                                                                       Serdes.String(),
                                                                       Serdes.String()).build();
         assertThat(store, not(nullValue()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d7a72833765..6b9e7a808ae 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -244,7 +244,7 @@ public void shouldLoadSegementsWithOldStyleDateFormattedName() {
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                 retention,
-                numSegments,
+                segmentInterval,
                 schema);
 
         bytesStore.init(context, bytesStore);
@@ -271,7 +271,7 @@ public void shouldLoadSegementsWithOldStyleColonFormattedName() {
 
         bytesStore = new RocksDBSegmentedBytesStore(storeName,
                 retention,
-                numSegments,
+                segmentInterval,
                 schema);
 
         bytesStore.init(context, bytesStore);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index c95cbbada98..b44d3691b99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,7 +53,7 @@ public void before() {
         schema.init("topic");
 
         final RocksDBSegmentedBytesStore bytesStore =
-                new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema);
+                new RocksDBSegmentedBytesStore("session-store", 10_000L, 60_000L, schema);
 
         sessionStore = new RocksDBSessionStore<>(bytesStore,
                                                  Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 2a84a7b12e9..ac481a747ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -69,9 +69,9 @@
     private final int numSegments = 3;
     private final long windowSize = 3L;
     private final String windowName = "window";
-    private final long segmentSize = 60_000;
-    private final long retentionPeriod = segmentSize * (numSegments - 1);
-    private final Segments segments = new Segments(windowName, retentionPeriod, segmentSize);
+    private final long segmentInterval = 60_000;
+    private final long retentionPeriod = segmentInterval * (numSegments - 1);
+    private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
@@ -107,12 +107,7 @@
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
         final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                retentionPeriod,
-                numSegments,
-                windowSize,
-                retainDuplicates),
+            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval),
             Serdes.Integer(),
             Serdes.String()).build();
 
@@ -134,10 +129,10 @@ public void shouldOnlyIterateOpenSegments() {
         setCurrentTime(currentTime);
         windowStore.put(1, "one");
 
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
         setCurrentTime(currentTime);
         windowStore.put(1, "two");
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
 
         setCurrentTime(currentTime);
         windowStore.put(1, "three");
@@ -145,7 +140,7 @@ public void shouldOnlyIterateOpenSegments() {
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
 
         // roll to the next segment that will close the first
-        currentTime = currentTime + segmentSize;
+        currentTime = currentTime + segmentInterval;
         setCurrentTime(currentTime);
         windowStore.put(1, "four");
 
@@ -167,7 +162,7 @@ private ProcessorRecordContext createRecordContext(final long time) {
     @Test
     public void testRangeAndSinglePointFetch() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -226,7 +221,7 @@ public void testRangeAndSinglePointFetch() {
     @Test
     public void shouldGetAll() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -245,7 +240,7 @@ public void shouldGetAll() {
     @Test
     public void shouldFetchAllInTimeRange() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -274,7 +269,7 @@ public void shouldFetchAllInTimeRange() {
     @Test
     public void testFetchRange() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -322,7 +317,7 @@ public void testFetchRange() {
     @Test
     public void testPutAndFetchBefore() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -368,7 +363,7 @@ public void testPutAndFetchBefore() {
     @Test
     public void testPutAndFetchAfter() {
         windowStore = createWindowStore(context, false);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         putFirstBatch(windowStore, startTime, context);
 
@@ -414,7 +409,7 @@ public void testPutAndFetchAfter() {
     @Test
     public void testPutSameKeyTimestamp() {
         windowStore = createWindowStore(context, true);
-        final long startTime = segmentSize - 4L;
+        final long startTime = segmentInterval - 4L;
 
         setCurrentTime(startTime);
         windowStore.put(0, "zero");
@@ -444,8 +439,8 @@ public void testRolling() {
         windowStore = createWindowStore(context, false);
 
         // to validate segments
-        final long startTime = segmentSize * 2;
-        final long increment = segmentSize / 2;
+        final long startTime = segmentInterval * 2;
+        final long increment = segmentInterval / 2;
         setCurrentTime(startTime);
         windowStore.put(0, "zero");
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
@@ -573,8 +568,8 @@ public void testRolling() {
 
     @Test
     public void testRestore() throws IOException {
-        final long startTime = segmentSize * 2;
-        final long increment = segmentSize / 2;
+        final long startTime = segmentInterval * 2;
+        final long increment = segmentInterval / 2;
 
         windowStore = createWindowStore(context, false);
         setCurrentTime(startTime);
@@ -725,7 +720,7 @@ public void testInitialLoading() {
         new File(storeDir, segments.segmentName(6L)).mkdir();
         windowStore.close();
 
-        context.setStreamTime(segmentSize * 6L);
+        context.setStreamTime(segmentInterval * 6L);
         windowStore = createWindowStore(context, false);
 
         final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
@@ -768,13 +763,9 @@ public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStor
     public void shouldFetchAndIterateOverExactKeys() {
         final long windowSize = 0x7a00000000000000L;
         final long retentionPeriod = 0x7a00000000000000L;
+
         final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                retentionPeriod,
-                2,
-                windowSize,
-                true),
+            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true),
             Serdes.String(),
             Serdes.String()).build();
 
@@ -837,7 +828,7 @@ public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
     @Test
     public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
         windowStore = new RocksDBWindowStore<>(
-            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()),
+            new RocksDBSegmentedBytesStore(windowName, retentionPeriod, segmentInterval, new WindowKeySchema()),
             Serdes.Integer(),
             new SerdeThatDoesntHandleNull(),
             false,
@@ -850,12 +841,7 @@ public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
     @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
         final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(
-                windowName,
-                60000,
-                2,
-                60000,
-                true),
+            Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true),
             Serdes.Bytes(),
             Serdes.String()).build();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 66ea3c42779..4916cb09991 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -76,7 +76,13 @@ public void before() {
         topology.addSource("the-source", topicName);
         topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
         topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor");
-        topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
+        topology.addStateStore(
+            Stores.windowStoreBuilder(
+                Stores.persistentWindowStore("window-store", 10L, 2L, false),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor"
+        );
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -------------------------------------------------------------
>
>                 Key: KAFKA-7080
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7080
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0, 1.1.0, 2.0.0, 1.0.1
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a CachingWindowStore. However, it incorrectly passes storeSupplier.segments() (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment size, but it likely results in much smaller segments than intended. For example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to advertise their segment size instead of segment count. I plan to create a KIP to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)