You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/17 20:18:50 UTC
[kafka] branch 2.1 updated: KAFKA-7080 and KAFKA-7222: Cleanup
overlapping KIP changes Part 2
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new bb370d2 KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2
bb370d2 is described below
commit bb370d2df1d52d1c8833b07f9afaba4f1e881c13
Author: John Roesler <jo...@confluent.io>
AuthorDate: Wed Oct 17 11:03:16 2018 -0700
KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2
#5804 removed `Windows#segmentInterval`, but did not remove all references to it.
Author: John Roesler <jo...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #5806 from vvcephei/fix-missing-segment-interval
---
.../streams/kstream/internals/TimeWindowedKStreamImpl.java | 4 ++--
.../java/org/apache/kafka/streams/kstream/WindowsTest.java | 13 -------------
.../java/org/apache/kafka/streams/perf/SimpleBenchmark.java | 7 +++----
.../java/org/apache/kafka/streams/state/StoresTest.java | 10 +++-------
.../streams/state/internals/RocksDBWindowStoreTest.java | 9 +++++----
5 files changed, 13 insertions(+), 30 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 8ba02bf..ecfe155 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -193,9 +193,9 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
supplier = Stores.persistentWindowStore(
materialized.storeName(),
windows.maintainMs(),
+ windows.segments,
windows.size(),
- false,
- windows.segmentInterval()
+ false
);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index 8709031..49bc56c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -42,19 +42,6 @@ public class WindowsTest {
@SuppressWarnings("deprecation") // specifically testing deprecated APIs
@Test
- public void shouldSetNumberOfSegments() {
- final int anySegmentSizeLargerThanOne = 5;
- final TestWindows testWindow = new TestWindows();
- final long maintainMs = testWindow.maintainMs();
-
- assertEquals(
- maintainMs / (anySegmentSizeLargerThanOne - 1),
- testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
- );
- }
-
- @SuppressWarnings("deprecation") // specifically testing deprecated APIs
- @Test
public void shouldSetWindowRetentionTime() {
final int anyNotNegativeRetentionTime = 42;
assertEquals(anyNotNegativeRetentionTime, new TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 4008689..4fe3c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -473,10 +473,9 @@ public class SimpleBenchmark {
final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"store",
- AGGREGATE_WINDOW_SIZE * 3,
- AGGREGATE_WINDOW_SIZE,
- false,
- 60_000L
+ ofMillis(AGGREGATE_WINDOW_SIZE * 3),
+ ofMillis(AGGREGATE_WINDOW_SIZE),
+ false
),
INTEGER_SERDE,
BYTE_SERDE
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index b62364a..1d4a849 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
import org.junit.Test;
+import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -55,12 +56,12 @@ public class StoresTest {
@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
- Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
+ Stores.persistentWindowStore(null, ZERO, ZERO, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
- Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
+ Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false);
}
@Deprecated
@@ -74,11 +75,6 @@ public class StoresTest {
Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false);
}
- @Test(expected = IllegalArgumentException.class)
- public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
- Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
- }
-
@Test(expected = NullPointerException.class)
public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
Stores.persistentSessionStore(null, ofMillis(0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index cd0f49a..c41b094 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static java.time.Duration.ofMillis;
import static java.time.Instant.ofEpochMilli;
import static java.util.Objects.requireNonNull;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,7 +71,7 @@ public class RocksDBWindowStoreTest {
private final int numSegments = 3;
private final long windowSize = 3L;
- private final long segmentInterval = 600L;
+ private final long segmentInterval = 60_000L;
private final long retentionPeriod = segmentInterval * (numSegments - 1);
private final String windowName = "window";
private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
@@ -108,7 +109,7 @@ public class RocksDBWindowStoreTest {
private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval),
+ Stores.persistentWindowStore(windowName, ofMillis(retentionPeriod), ofMillis(windowSize), retainDuplicates),
Serdes.Integer(),
Serdes.String()).build();
@@ -771,7 +772,7 @@ public class RocksDBWindowStoreTest {
final long retentionPeriod = 0x7a00000000000000L;
final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true),
+ Stores.persistentWindowStore(windowName, ofMillis(retentionPeriod), ofMillis(windowSize), true),
Serdes.String(),
Serdes.String()).build();
@@ -848,7 +849,7 @@ public class RocksDBWindowStoreTest {
@Test
public void shouldFetchAndIterateOverExactBinaryKeys() {
final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
- Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true),
+ Stores.persistentWindowStore(windowName, ofMillis(60_000L), ofMillis(60_000L), true),
Serdes.Bytes(),
Serdes.String()).build();