You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/07/02 23:07:43 UTC
[kafka] branch trunk updated: KAFKA-7080: replace numSegments with
segmentInterval (#5257)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 64fff8b KAFKA-7080: replace numSegments with segmentInterval (#5257)
64fff8b is described below
commit 64fff8bfcc9b92769640bfaa692e19d0db8861a6
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Mon Jul 2 18:07:38 2018 -0500
KAFKA-7080: replace numSegments with segmentInterval (#5257)
See also KIP-319.
Replace number-of-segments parameters with segment-interval-ms parameters in various places. The latter was always the parameter that several components needed, and we accidentally supplied the former because it was the one available.
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/streams/kstream/SessionWindows.java | 12 ++---
.../org/apache/kafka/streams/kstream/Windows.java | 29 ++++++----
.../streams/kstream/internals/KStreamImpl.java | 17 +++---
.../kstream/internals/TimeWindowedKStreamImpl.java | 12 +++--
.../org/apache/kafka/streams/state/Stores.java | 61 +++++++++++++++++++---
.../streams/state/WindowBytesStoreSupplier.java | 13 ++++-
.../RocksDbSessionBytesStoreSupplier.java | 1 +
.../internals/RocksDbWindowBytesStoreSupplier.java | 6 +++
.../state/internals/WindowStoreBuilder.java | 2 +-
.../kafka/streams/kstream/JoinWindowsTest.java | 10 ----
.../kafka/streams/kstream/SessionWindowsTest.java | 2 +-
.../kafka/streams/kstream/TimeWindowsTest.java | 2 +-
.../apache/kafka/streams/kstream/WindowsTest.java | 8 ++-
.../apache/kafka/streams/perf/SimpleBenchmark.java | 17 +++---
.../internals/InternalTopologyBuilderTest.java | 10 +++-
.../org/apache/kafka/streams/state/StoresTest.java | 29 +++++-----
.../internals/RocksDBSegmentedBytesStoreTest.java | 4 +-
.../state/internals/RocksDBSessionStoreTest.java | 2 +-
.../state/internals/RocksDBWindowStoreTest.java | 60 ++++++++-------------
.../StreamThreadStateStoreProviderTest.java | 8 ++-
20 files changed, 190 insertions(+), 115 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index aa3dec1..fc1fb9f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -66,11 +66,11 @@ import java.util.Objects;
public final class SessionWindows {
private final long gapMs;
- private long maintainDurationMs;
+ private final long maintainDurationMs;
- private SessionWindows(final long gapMs) {
+ private SessionWindows(final long gapMs, final long maintainDurationMs) {
this.gapMs = gapMs;
- maintainDurationMs = Windows.DEFAULT_MAINTAIN_DURATION_MS;
+ this.maintainDurationMs = maintainDurationMs;
}
/**
@@ -85,7 +85,8 @@ public final class SessionWindows {
if (inactivityGapMs <= 0) {
throw new IllegalArgumentException("Gap time (inactivityGapMs) cannot be zero or negative.");
}
- return new SessionWindows(inactivityGapMs);
+ final long oneDayMs = 24 * 60 * 60_000L;
+ return new SessionWindows(inactivityGapMs, oneDayMs);
}
/**
@@ -99,9 +100,8 @@ public final class SessionWindows {
if (durationMs < gapMs) {
throw new IllegalArgumentException("Window retention time (durationMs) cannot be smaller than window gap.");
}
- maintainDurationMs = durationMs;
- return this;
+ return new SessionWindows(gapMs, durationMs);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 09fdfce..53ead1e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -36,18 +36,10 @@ import java.util.Map;
*/
public abstract class Windows<W extends Window> {
- private static final int DEFAULT_NUM_SEGMENTS = 3;
+ private long maintainDurationMs = 24 * 60 * 60 * 1000L; // default: one day
+ @Deprecated public int segments = 3;
- static final long DEFAULT_MAINTAIN_DURATION_MS = 24 * 60 * 60 * 1000L; // one day
-
- private long maintainDurationMs;
-
- public int segments;
-
- protected Windows() {
- segments = DEFAULT_NUM_SEGMENTS;
- maintainDurationMs = DEFAULT_MAINTAIN_DURATION_MS;
- }
+ protected Windows() {}
/**
* Set the window maintain duration (retention time) in milliseconds.
@@ -77,13 +69,28 @@ public abstract class Windows<W extends Window> {
}
/**
+ * Return the segment interval in milliseconds.
+ *
+ * @return the segment interval
+ */
+ @SuppressWarnings("deprecation") // The deprecation is on the public visibility of segments. We intend to make the field private later.
+ public long segmentInterval() {
+ // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient.
+ final long minimumSegmentInterval = 60_000L;
+ // Scaled to the (possibly overridden) retention period
+ return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
+ }
+
+ /**
* Set the number of segments to be used for rolling the window store.
* This function is not exposed to users but can be called by developers that extend this class.
*
* @param segments the number of segments to be used
* @return itself
* @throws IllegalArgumentException if specified segments is small than 2
+ * @deprecated since 2.1 Override segmentInterval() instead.
*/
+ @Deprecated
protected Windows<W> segments(final int segments) throws IllegalArgumentException {
if (segments < 2) {
throw new IllegalArgumentException("Number of segments must be at least 2.");
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index bc56a3d..acfdf35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -844,12 +844,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
final Serde<K> keySerde,
final Serde<V> valueSerde,
final String storeName) {
- return Stores.windowStoreBuilder(Stores.persistentWindowStore(storeName,
- windows.maintainMs(),
- windows.segments,
- windows.size(),
- true), keySerde, valueSerde);
-
+ return Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(
+ storeName,
+ windows.maintainMs(),
+ windows.size(),
+ true,
+ windows.segmentInterval()
+ ),
+ keySerde,
+ valueSerde
+ );
}
private class KStreamImplJoin {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index e545f48..7d6d174 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -155,11 +155,13 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
private <VR> StoreBuilder<WindowStore<K, VR>> materialize(final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) materialized.storeSupplier();
if (supplier == null) {
- supplier = Stores.persistentWindowStore(materialized.storeName(),
- windows.maintainMs(),
- windows.segments,
- windows.size(),
- false);
+ supplier = Stores.persistentWindowStore(
+ materialized.storeName(),
+ windows.maintainMs(),
+ windows.size(),
+ false,
+ windows.segmentInterval()
+ );
}
final StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier,
materialized.keySerde(),
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index eebd59f..c1b81c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -79,7 +79,7 @@ public class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
- * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
@@ -90,7 +90,7 @@ public class Stores {
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
- * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
*/
public static KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) {
@@ -151,25 +151,72 @@ public class Stores {
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
+ * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead
*/
+ @Deprecated
public static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final int numSegments,
final long windowSize,
final boolean retainDuplicates) {
+ if (numSegments < 2) {
+ throw new IllegalArgumentException("numSegments cannot must smaller than 2");
+ }
+
+ final long legacySegmentInterval = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+
+ return persistentWindowStore(
+ name,
+ retentionPeriod,
+ windowSize,
+ retainDuplicates,
+ legacySegmentInterval
+ );
+ }
+
+ /**
+ * Create a persistent {@link WindowBytesStoreSupplier}.
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * @param windowSize size of the windows (cannot be negative)
+ * @param retainDuplicates whether or not to retain duplicates.
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ */
+ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+ final long retentionPeriod,
+ final long windowSize,
+ final boolean retainDuplicates) {
+ // we're arbitrarily defaulting to segments no smaller than one minute.
+ final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L);
+ return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval);
+ }
+
+ /**
+ * Create a persistent {@link WindowBytesStoreSupplier}.
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store (cannot be negative)
+ * @param segmentInterval size of segments in ms (must be at least one minute)
+ * @param windowSize size of the windows (cannot be negative)
+ * @param retainDuplicates whether or not to retain duplicates.
+ * @return an instance of {@link WindowBytesStoreSupplier}
+ */
+ public static WindowBytesStoreSupplier persistentWindowStore(final String name,
+ final long retentionPeriod,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final long segmentInterval) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
- if (numSegments < 2) {
- throw new IllegalArgumentException("numSegments cannot must smaller than 2");
- }
if (windowSize < 0) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
- final long segmentIntervalMs = Math.max(retentionPeriod / (numSegments - 1), 60_000L);
+ if (segmentInterval < 60_000) {
+ throw new IllegalArgumentException("segmentInterval must be at least one minute");
+ }
- return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentIntervalMs, windowSize, retainDuplicates);
+ return new RocksDbWindowBytesStoreSupplier(name, retentionPeriod, segmentInterval, windowSize, retainDuplicates);
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
index 9cf70c2..c071b34 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowBytesStoreSupplier.java
@@ -33,11 +33,22 @@ public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Byte
* It is also used to reduce the amount of data that is scanned when caching is enabled.
*
* @return number of segments
+ * @deprecated since 2.1. Use {@link WindowBytesStoreSupplier#segmentIntervalMs()} instead.
*/
+ @Deprecated
int segments();
/**
- * The size of the windows any store created from this supplier is creating.
+ * The size of the segments (in milliseconds) the store has.
+ * If your store is segmented then this should be the size of segments in the underlying store.
+ * It is also used to reduce the amount of data that is scanned when caching is enabled.
+ *
+ * @return size of the segments (in milliseconds)
+ */
+ long segmentIntervalMs();
+
+ /**
+ * The size of the windows (in milliseconds) any store created from this supplier is creating.
*
* @return window size
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 37968ce..45df39c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -53,6 +53,7 @@ public class RocksDbSessionBytesStoreSupplier implements SessionBytesStoreSuppli
@Override
public long segmentIntervalMs() {
+ // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
return Math.max(retentionPeriod / 2, 60_000L);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 9421bcb..5c7b099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -66,12 +66,18 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
return "rocksdb-window-state";
}
+ @Deprecated
@Override
public int segments() {
return (int) (retentionPeriod / segmentInterval) + 1;
}
@Override
+ public long segmentIntervalMs() {
+ return segmentInterval;
+ }
+
+ @Override
public long windowSize() {
return windowSize;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
index 97b4883..31d063a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowStoreBuilder.java
@@ -52,7 +52,7 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
keySerde,
valueSerde,
storeSupplier.windowSize(),
- storeSupplier.segments());
+ storeSupplier.segmentIntervalMs());
}
private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 0611704..7b22df1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -113,16 +113,6 @@ public class JoinWindowsTest {
}
@Test
- public void shouldUseWindowSizeForMaintainDurationWhenSizeLargerThanDefaultMaintainMs() {
- final long size = Windows.DEFAULT_MAINTAIN_DURATION_MS;
-
- final JoinWindows windowSpec = JoinWindows.of(size);
- final long windowSize = windowSpec.size();
-
- assertEquals(windowSize, windowSpec.maintainMs());
- }
-
- @Test
public void retentionTimeMustNoBeSmallerThanWindowSize() {
final JoinWindows windowSpec = JoinWindows.of(anySize);
final long windowSize = windowSpec.size();
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 8c0a0b9..d0e5996 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -50,7 +50,7 @@ public class SessionWindowsTest {
@Test
public void retentionTimeShouldBeGapIfGapIsLargerThanDefaultRetentionTime() {
- final long windowGap = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+ final long windowGap = 2 * SessionWindows.with(1).maintainMs();
assertEquals(windowGap, SessionWindows.with(windowGap).maintainMs());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 7e6bb3e..390678f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -47,7 +47,7 @@ public class TimeWindowsTest {
@Test
public void shouldUseWindowSizeAsRentitionTimeIfWindowSizeIsLargerThanDefaultRetentionTime() {
- final long windowSize = 2 * Windows.DEFAULT_MAINTAIN_DURATION_MS;
+ final long windowSize = 2 * TimeWindows.of(1).maintainMs();
assertEquals(windowSize, TimeWindows.of(windowSize).maintainMs());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index 77faf1a..2e9246e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -40,7 +40,13 @@ public class WindowsTest {
@Test
public void shouldSetNumberOfSegments() {
final int anySegmentSizeLargerThanOne = 5;
- assertEquals(anySegmentSizeLargerThanOne, new TestWindows().segments(anySegmentSizeLargerThanOne).segments);
+ final TestWindows testWindow = new TestWindows();
+ final long maintainMs = testWindow.maintainMs();
+
+ assertEquals(
+ maintainMs / (anySegmentSizeLargerThanOne - 1),
+ testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
+ );
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7179293..654fd03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -32,10 +32,10 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -469,13 +469,18 @@ public class SimpleBenchmark {
setStreamProperties("simple-benchmark-streams-with-store");
final StreamsBuilder builder = new StreamsBuilder();
- final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder
- = Stores.windowStoreBuilder(Stores.persistentWindowStore("store",
+
+ final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
+ Stores.persistentWindowStore(
+ "store",
AGGREGATE_WINDOW_SIZE * 3,
- 3,
AGGREGATE_WINDOW_SIZE,
- false),
- INTEGER_SERDE, BYTE_SERDE);
+ false,
+ 60_000L
+ ),
+ INTEGER_SERDE,
+ BYTE_SERDE
+ );
builder.addStateStore(storeBuilder.withCachingEnabled());
final KStream<Integer, byte[]> source = builder.stream(topic);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index b0674ea..fb64130 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -532,7 +532,15 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId("appId");
builder.addSource(null, "source", null, null, null, "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
- builder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 30000, 3, 10000, false), Serdes.String(), Serdes.String()), "processor");
+
+ builder.addStateStore(
+ Stores.windowStoreBuilder(
+ Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ "processor"
+ );
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index 5383c27..23f246d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -28,7 +28,6 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.fail;
public class StoresTest {
@@ -54,22 +53,28 @@ public class StoresTest {
@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
- Stores.persistentWindowStore(null, 0, 1, 0, false);
+ Stores.persistentWindowStore(null, 0L, 0L, false, 60_000L);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
- Stores.persistentWindowStore("anyName", -1, 1, 0, false);
+ Stores.persistentWindowStore("anyName", -1L, 0L, false, 60_000L);
}
+ @Deprecated
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
- Stores.persistentWindowStore("anyName", 0, 0, 0, false);
+ Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
- Stores.persistentWindowStore("anyName", 0, 1, -1, false);
+ Stores.persistentWindowStore("anyName", 0L, -1L, false);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
+ Stores.persistentWindowStore("anyName", 1L, 1L, false, 59_999L);
}
@Test(expected = NullPointerException.class)
@@ -98,16 +103,6 @@ public class StoresTest {
}
@Test
- public void shouldThrowIllegalArgumentExceptionWhenTryingToConstructWindowStoreWithLessThanTwoSegments() {
- try {
- Stores.persistentWindowStore("store", 1, 1, 1, false);
- fail("Should have thrown illegal argument exception as number of segments is less than 2");
- } catch (final IllegalArgumentException e) {
- // ok
- }
- }
-
- @Test
public void shouldCreateInMemoryKeyValueStore() {
assertThat(Stores.inMemoryKeyValueStore("memory").get(), instanceOf(InMemoryKeyValueStore.class));
}
@@ -124,7 +119,7 @@ public class StoresTest {
@Test
public void shouldCreateRocksDbWindowStore() {
- assertThat(Stores.persistentWindowStore("store", 1, 3, 1, false).get(), instanceOf(RocksDBWindowStore.class));
+ assertThat(Stores.persistentWindowStore("store", 1L, 1L, false).get(), instanceOf(RocksDBWindowStore.class));
}
@Test
@@ -134,7 +129,7 @@ public class StoresTest {
@Test
public void shouldBuildWindowStore() {
- final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3, 2, 3, true),
+ final WindowStore<String, String> store = Stores.windowStoreBuilder(Stores.persistentWindowStore("store", 3L, 3L, true),
Serdes.String(),
Serdes.String()).build();
assertThat(store, not(nullValue()));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index d7a7283..6b9e7a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -244,7 +244,7 @@ public class RocksDBSegmentedBytesStoreTest {
bytesStore = new RocksDBSegmentedBytesStore(storeName,
retention,
- numSegments,
+ segmentInterval,
schema);
bytesStore.init(context, bytesStore);
@@ -271,7 +271,7 @@ public class RocksDBSegmentedBytesStoreTest {
bytesStore = new RocksDBSegmentedBytesStore(storeName,
retention,
- numSegments,
+ segmentInterval,
schema);
bytesStore.init(context, bytesStore);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
index c95cbba..b44d369 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java
@@ -53,7 +53,7 @@ public class RocksDBSessionStoreTest {
schema.init("topic");
final RocksDBSegmentedBytesStore bytesStore =
- new RocksDBSegmentedBytesStore("session-store", 10000L, 3, schema);
+ new RocksDBSegmentedBytesStore("session-store", 10_000L, 60_000L, schema);
sessionStore = new RocksDBSessionStore<>(bytesStore,
Serdes.String(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 2a84a7b..ac481a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -69,9 +69,9 @@ public class RocksDBWindowStoreTest {
private final int numSegments = 3;
private final long windowSize = 3L;
private final String windowName = "window";
- private final long segmentSize = 60_000;
- private final long retentionPeriod = segmentSize * (numSegments - 1);
- private final Segments segments = new Segments(windowName, retentionPeriod, segmentSize);
+ private final long segmentInterval = 60_000;
+ private final long retentionPeriod = segmentInterval * (numSegments - 1);
+ private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", Serdes.Integer(), Serdes.String());
private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
@@ -107,12 +107,7 @@ public class RocksDBWindowStoreTest {
private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
- windowName,
- retentionPeriod,
- numSegments,
- windowSize,
- retainDuplicates),
+ Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval),
Serdes.Integer(),
Serdes.String()).build();
@@ -134,10 +129,10 @@ public class RocksDBWindowStoreTest {
setCurrentTime(currentTime);
windowStore.put(1, "one");
- currentTime = currentTime + segmentSize;
+ currentTime = currentTime + segmentInterval;
setCurrentTime(currentTime);
windowStore.put(1, "two");
- currentTime = currentTime + segmentSize;
+ currentTime = currentTime + segmentInterval;
setCurrentTime(currentTime);
windowStore.put(1, "three");
@@ -145,7 +140,7 @@ public class RocksDBWindowStoreTest {
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
// roll to the next segment that will close the first
- currentTime = currentTime + segmentSize;
+ currentTime = currentTime + segmentInterval;
setCurrentTime(currentTime);
windowStore.put(1, "four");
@@ -167,7 +162,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testRangeAndSinglePointFetch() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -226,7 +221,7 @@ public class RocksDBWindowStoreTest {
@Test
public void shouldGetAll() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -245,7 +240,7 @@ public class RocksDBWindowStoreTest {
@Test
public void shouldFetchAllInTimeRange() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -274,7 +269,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testFetchRange() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -322,7 +317,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testPutAndFetchBefore() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -368,7 +363,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testPutAndFetchAfter() {
windowStore = createWindowStore(context, false);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
putFirstBatch(windowStore, startTime, context);
@@ -414,7 +409,7 @@ public class RocksDBWindowStoreTest {
@Test
public void testPutSameKeyTimestamp() {
windowStore = createWindowStore(context, true);
- final long startTime = segmentSize - 4L;
+ final long startTime = segmentInterval - 4L;
setCurrentTime(startTime);
windowStore.put(0, "zero");
@@ -444,8 +439,8 @@ public class RocksDBWindowStoreTest {
windowStore = createWindowStore(context, false);
// to validate segments
- final long startTime = segmentSize * 2;
- final long increment = segmentSize / 2;
+ final long startTime = segmentInterval * 2;
+ final long increment = segmentInterval / 2;
setCurrentTime(startTime);
windowStore.put(0, "zero");
assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
@@ -573,8 +568,8 @@ public class RocksDBWindowStoreTest {
@Test
public void testRestore() throws IOException {
- final long startTime = segmentSize * 2;
- final long increment = segmentSize / 2;
+ final long startTime = segmentInterval * 2;
+ final long increment = segmentInterval / 2;
windowStore = createWindowStore(context, false);
setCurrentTime(startTime);
@@ -725,7 +720,7 @@ public class RocksDBWindowStoreTest {
new File(storeDir, segments.segmentName(6L)).mkdir();
windowStore.close();
- context.setStreamTime(segmentSize * 6L);
+ context.setStreamTime(segmentInterval * 6L);
windowStore = createWindowStore(context, false);
final List<String> expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L));
@@ -768,13 +763,9 @@ public class RocksDBWindowStoreTest {
public void shouldFetchAndIterateOverExactKeys() {
final long windowSize = 0x7a00000000000000L;
final long retentionPeriod = 0x7a00000000000000L;
+
final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
- windowName,
- retentionPeriod,
- 2,
- windowSize,
- true),
+ Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true),
Serdes.String(),
Serdes.String()).build();
@@ -837,7 +828,7 @@ public class RocksDBWindowStoreTest {
@Test
public void shouldNoNullPointerWhenSerdeDoesNotHandleNull() {
windowStore = new RocksDBWindowStore<>(
- new RocksDBSegmentedBytesStore(windowName, retentionPeriod, numSegments, new WindowKeySchema()),
+ new RocksDBSegmentedBytesStore(windowName, retentionPeriod, segmentInterval, new WindowKeySchema()),
Serdes.Integer(),
new SerdeThatDoesntHandleNull(),
false,
@@ -850,12 +841,7 @@ public class RocksDBWindowStoreTest {
@Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(
- windowName,
- 60000,
- 2,
- 60000,
- true),
+ Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true),
Serdes.Bytes(),
Serdes.String()).build();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 66ea3c4..4916cb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -76,7 +76,13 @@ public class StreamThreadStateStoreProviderTest {
topology.addSource("the-source", topicName);
topology.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
topology.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), "the-processor");
- topology.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("window-store", 10, 2, 2, false), Serdes.String(), Serdes.String()), "the-processor");
+ topology.addStateStore(
+ Stores.windowStoreBuilder(
+ Stores.persistentWindowStore("window-store", 10L, 2L, false),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor"
+ );
final Properties properties = new Properties();
final String applicationId = "applicationId";