You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/10/17 20:18:50 UTC

[kafka] branch 2.1 updated: KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new bb370d2  KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2
bb370d2 is described below

commit bb370d2df1d52d1c8833b07f9afaba4f1e881c13
Author: John Roesler <jo...@confluent.io>
AuthorDate: Wed Oct 17 11:03:16 2018 -0700

    KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes Part 2
    
    #5804 removed `Windows#segmentInterval`, but did not remove all references to it.
    
    Author: John Roesler <jo...@confluent.io>
    
    Reviewers: Damian Guy <da...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5806 from vvcephei/fix-missing-segment-interval
---
 .../streams/kstream/internals/TimeWindowedKStreamImpl.java  |  4 ++--
 .../java/org/apache/kafka/streams/kstream/WindowsTest.java  | 13 -------------
 .../java/org/apache/kafka/streams/perf/SimpleBenchmark.java |  7 +++----
 .../java/org/apache/kafka/streams/state/StoresTest.java     | 10 +++-------
 .../streams/state/internals/RocksDBWindowStoreTest.java     |  9 +++++----
 5 files changed, 13 insertions(+), 30 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 8ba02bf..ecfe155 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -193,9 +193,9 @@ public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStr
                 supplier = Stores.persistentWindowStore(
                     materialized.storeName(),
                     windows.maintainMs(),
+                    windows.segments,
                     windows.size(),
-                    false,
-                    windows.segmentInterval()
+                    false
                 );
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
index 8709031..49bc56c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowsTest.java
@@ -42,19 +42,6 @@ public class WindowsTest {
 
     @SuppressWarnings("deprecation") // specifically testing deprecated APIs
     @Test
-    public void shouldSetNumberOfSegments() {
-        final int anySegmentSizeLargerThanOne = 5;
-        final TestWindows testWindow = new TestWindows();
-        final long maintainMs = testWindow.maintainMs();
-
-        assertEquals(
-            maintainMs / (anySegmentSizeLargerThanOne - 1),
-            testWindow.segments(anySegmentSizeLargerThanOne).segmentInterval()
-        );
-    }
-
-    @SuppressWarnings("deprecation") // specifically testing deprecated APIs
-    @Test
     public void shouldSetWindowRetentionTime() {
         final int anyNotNegativeRetentionTime = 42;
         assertEquals(anyNotNegativeRetentionTime, new TestWindows().until(anyNotNegativeRetentionTime).maintainMs());
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 4008689..4fe3c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -473,10 +473,9 @@ public class SimpleBenchmark {
         final StoreBuilder<WindowStore<Integer, byte[]>> storeBuilder = Stores.windowStoreBuilder(
             Stores.persistentWindowStore(
                 "store",
-                AGGREGATE_WINDOW_SIZE * 3,
-                AGGREGATE_WINDOW_SIZE,
-                false,
-                60_000L
+                ofMillis(AGGREGATE_WINDOW_SIZE * 3),
+                ofMillis(AGGREGATE_WINDOW_SIZE),
+                false
             ),
             INTEGER_SERDE,
             BYTE_SERDE
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
index b62364a..1d4a849 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBWindowStore;
 import org.junit.Test;
 
+import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -55,12 +56,12 @@ public class StoresTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
-        Stores.persistentWindowStore(null, 0L, 0L, false, 0L);
+        Stores.persistentWindowStore(null, ZERO, ZERO, false);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
-        Stores.persistentWindowStore("anyName", -1L, 0L, false, 0L);
+        Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false);
     }
 
     @Deprecated
@@ -74,11 +75,6 @@ public class StoresTest {
         Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false);
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void shouldThrowIfIPersistentWindowStoreIfSegmentIntervalIsTooSmall() {
-        Stores.persistentWindowStore("anyName", 1L, 1L, false, -1L);
-    }
-
     @Test(expected = NullPointerException.class)
     public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
         Stores.persistentSessionStore(null, ofMillis(0));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index cd0f49a..c41b094 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static java.time.Duration.ofMillis;
 import static java.time.Instant.ofEpochMilli;
 import static java.util.Objects.requireNonNull;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,7 +71,7 @@ public class RocksDBWindowStoreTest {
 
     private final int numSegments = 3;
     private final long windowSize = 3L;
-    private final long segmentInterval = 600L;
+    private final long segmentInterval = 60_000L;
     private final long retentionPeriod = segmentInterval * (numSegments - 1);
     private final String windowName = "window";
     private final Segments segments = new Segments(windowName, retentionPeriod, segmentInterval);
@@ -108,7 +109,7 @@ public class RocksDBWindowStoreTest {
 
     private WindowStore<Integer, String> createWindowStore(final ProcessorContext context, final boolean retainDuplicates) {
         final WindowStore<Integer, String> store = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, retainDuplicates, segmentInterval),
+            Stores.persistentWindowStore(windowName, ofMillis(retentionPeriod), ofMillis(windowSize), retainDuplicates),
             Serdes.Integer(),
             Serdes.String()).build();
 
@@ -771,7 +772,7 @@ public class RocksDBWindowStoreTest {
         final long retentionPeriod = 0x7a00000000000000L;
 
         final WindowStore<String, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(windowName, retentionPeriod, windowSize, true),
+            Stores.persistentWindowStore(windowName, ofMillis(retentionPeriod), ofMillis(windowSize), true),
             Serdes.String(),
             Serdes.String()).build();
 
@@ -848,7 +849,7 @@ public class RocksDBWindowStoreTest {
     @Test
     public void shouldFetchAndIterateOverExactBinaryKeys() {
         final WindowStore<Bytes, String> windowStore = Stores.windowStoreBuilder(
-            Stores.persistentWindowStore(windowName, 60_000L, 60_000L, true),
+            Stores.persistentWindowStore(windowName, ofMillis(60_000L), ofMillis(60_000L), true),
             Serdes.Bytes(),
             Serdes.String()).build();