You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/30 18:07:29 UTC

[1/3] kafka git commit: hotfix: check join window boundaries

Repository: kafka
Updated Branches:
  refs/heads/0.10.0 73e2f090c -> 4091a13c5


hotfix: check join window boundaries


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9dc5dc46
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9dc5dc46
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9dc5dc46

Branch: refs/heads/0.10.0
Commit: 9dc5dc461ea4fcaf4f429cfb6c83b4fd6284921f
Parents: 73e2f09
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 30 01:53:06 2016 +0200
Committer: Matthias J. Sax <ma...@confluent.io>
Committed: Thu Jun 30 01:53:06 2016 +0200

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      | 21 ++++++++-----
 .../kafka/streams/kstream/JoinWindowsTest.java  | 32 +++++++++++++-------
 2 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9dc5dc46/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 53ddf3e..936bcd2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -42,7 +42,8 @@ import java.util.Map;
  * A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
  * a join specification on the second stream with flipped before and after values.
  * <p>
- * Both values (before and after) must not be negative and not zero at the same time.
+ * Both values (before and after) must not result in an "inverse" window,
+ * i.e., lower-interval-bound must not be larger than upper-interval.bound.
  */
 public class JoinWindows extends Windows<TimeWindow> {
 
@@ -54,14 +55,17 @@ public class JoinWindows extends Windows<TimeWindow> {
     private JoinWindows(String name, long before, long after) {
         super(name);
 
-        if (before < 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided before as " + before + ")");
+        if (before < 0) { // shift lower bound to right
+            if (after < -before) {
+                throw new IllegalArgumentException("Upper interval bound smaller than lower interval bound."
+                                                   + " <after> must be at least " + (-before));
+            }
         }
-        if (after < 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided after as " + after + ")");
-        }
-        if (before == 0 && after == 0) {
-            throw new IllegalArgumentException("window size must be > 0 (you provided 0)");
+        if (after < 0) { // shift upper bound to left
+            if (before < -after) {
+                throw new IllegalArgumentException("Lower interval bound greater than upper interval bound."
+                        + " <before> must be at least " + (-after));
+            }
         }
 
         this.after = after;
@@ -70,6 +74,7 @@ public class JoinWindows extends Windows<TimeWindow> {
 
     /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
+     * ({@code timeDifference} must not be negative)
      *
      * @param timeDifference    join window interval
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/9dc5dc46/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index d8fa7b4..d80342a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -29,7 +29,7 @@ public class JoinWindowsTest {
 
     private static String anyName = "window";
     private static long anySize = 123L;
-    private static long anyOtherSize = 456L;
+    private static long anyOtherSize = 456L; // should be larger than anySize
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
@@ -66,6 +66,21 @@ public class JoinWindowsTest {
         assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
     }
 
+    @Test
+    public void validWindows() {
+        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+            .before(anySize)                    // [ -anySize ; anyOtherSize ]
+            .before(0)                          // [ 0 ; anyOtherSize ]
+            .before(-anySize)                   // [ anySize ; anyOtherSize ]
+            .before(-anyOtherSize);             // [ anyOtherSize ; anyOtherSize ]
+
+        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
+            .after(anySize)                     // [ -anyOtherSize ; anySize ]
+            .after(0)                           // [ -anyOtherSize ; 0 ]
+            .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
+            .after(-anyOtherSize);              // [ -anyOtherSize ; -anyOtherSize ]
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void nameMustNotBeEmpty() {
         JoinWindows.of("", anySize);
@@ -77,23 +92,18 @@ public class JoinWindowsTest {
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void windowSizeMustNotBeNegative() {
+    public void timeDifferenceMustNotBeNegative() {
         JoinWindows.of(anyName, -1);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void beforeMustNotBeNegative() {
-        JoinWindows.of(anyName, anySize).before(-1);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void afterSizeMustNotBeNegative() {
-        JoinWindows.of(anyName, anySize).after(-1);
+    public void afterBelowLower() {
+        JoinWindows.of(anyName, anySize).after(-anySize-1);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void windowSizeMustNotBeZero() {
-        JoinWindows.of(anyName, 0);
+    public void beforeOverUpper() {
+        JoinWindows.of(anyName, anySize).before(-anySize-1);
     }
 
 }
\ No newline at end of file


[2/3] kafka git commit: fixed checkstyle simplified boundary check

Posted by gu...@apache.org.
fixed checkstyle
simplified boundary check


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6ba0aaae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6ba0aaae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6ba0aaae

Branch: refs/heads/0.10.0
Commit: 6ba0aaaedfe39069ecd8826b73c5a530a529faeb
Parents: 9dc5dc4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 30 14:59:11 2016 +0200
Committer: Matthias J. Sax <ma...@confluent.io>
Committed: Thu Jun 30 15:29:53 2016 +0200

----------------------------------------------------------------------
 .../org/apache/kafka/streams/kstream/JoinWindows.java  | 13 ++-----------
 .../apache/kafka/streams/kstream/JoinWindowsTest.java  |  4 ++--
 2 files changed, 4 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6ba0aaae/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 936bcd2..309a9e6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -55,17 +55,8 @@ public class JoinWindows extends Windows<TimeWindow> {
     private JoinWindows(String name, long before, long after) {
         super(name);
 
-        if (before < 0) { // shift lower bound to right
-            if (after < -before) {
-                throw new IllegalArgumentException("Upper interval bound smaller than lower interval bound."
-                                                   + " <after> must be at least " + (-before));
-            }
-        }
-        if (after < 0) { // shift upper bound to left
-            if (before < -after) {
-                throw new IllegalArgumentException("Lower interval bound greater than upper interval bound."
-                        + " <before> must be at least " + (-after));
-            }
+        if (before + after < 0) {
+            throw new IllegalArgumentException("Window interval (ie, before+after) must not be negative");
         }
 
         this.after = after;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6ba0aaae/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index d80342a..20efd45 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -98,12 +98,12 @@ public class JoinWindowsTest {
 
     @Test(expected = IllegalArgumentException.class)
     public void afterBelowLower() {
-        JoinWindows.of(anyName, anySize).after(-anySize-1);
+        JoinWindows.of(anyName, anySize).after(-anySize - 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void beforeOverUpper() {
-        JoinWindows.of(anyName, anySize).before(-anySize-1);
+        JoinWindows.of(anyName, anySize).before(-anySize - 1);
     }
 
 }
\ No newline at end of file


[3/3] kafka git commit: revertd API breaking changes

Posted by gu...@apache.org.
revertd API breaking changes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4091a13c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4091a13c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4091a13c

Branch: refs/heads/0.10.0
Commit: 4091a13c5a3ef4805590357ab872333798a1bbe8
Parents: 6ba0aaa
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 30 15:07:00 2016 +0200
Committer: Matthias J. Sax <ma...@confluent.io>
Committed: Thu Jun 30 15:35:08 2016 +0200

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      | 16 +++++--
 .../kafka/streams/kstream/JoinWindowsTest.java  | 48 ++++++++++----------
 .../kstream/internals/KStreamImplTest.java      |  4 +-
 .../internals/KStreamKStreamJoinTest.java       |  6 +--
 .../internals/KStreamKStreamLeftJoinTest.java   |  6 ++-
 5 files changed, 46 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 309a9e6..50c0453 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -64,13 +64,21 @@ public class JoinWindows extends Windows<TimeWindow> {
     }
 
     /**
+     * Specifies that records of the same key are joinable if their timestamps are equal.
+     *
+     * @param name    The name of the window. Must not be null or empty.
+     */
+    public static JoinWindows of(String name) {
+        return new JoinWindows(name, 0L, 0L);
+    }
+
+    /**
      * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
-     * ({@code timeDifference} must not be negative)
      *
-     * @param timeDifference    join window interval
+     * @param timeDifference    join window interval (must not be negative)
      */
-    public static JoinWindows of(String name, long timeDifference) {
-        return new JoinWindows(name, timeDifference, timeDifference);
+    public JoinWindows with(long timeDifference) {
+        return new JoinWindows(this.name, timeDifference, timeDifference);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 20efd45..4e14777 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -33,8 +33,8 @@ public class JoinWindowsTest {
 
     @Test
     public void shouldHaveSaneEqualsAndHashCode() {
-        JoinWindows w1 = JoinWindows.of("w1", anySize);
-        JoinWindows w2 = JoinWindows.of("w2", anySize);
+        JoinWindows w1 = JoinWindows.of("w1").with(anySize);
+        JoinWindows w2 = JoinWindows.of("w2").with(anySize);
 
         // Reflexive
         assertEquals(w1, w1);
@@ -45,8 +45,8 @@ public class JoinWindowsTest {
         assertEquals(w2, w1);
         assertEquals(w1.hashCode(), w2.hashCode());
 
-        JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize);
-        JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after);
+        JoinWindows w3 = JoinWindows.of("w3").with(w2.after).before(anyOtherSize);
+        JoinWindows w4 = JoinWindows.of("w4").with(anyOtherSize).after(w2.after);
         assertEquals(w3, w4);
         assertEquals(w4, w3);
         assertEquals(w3.hashCode(), w4.hashCode());
@@ -56,54 +56,56 @@ public class JoinWindowsTest {
         assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
         assertNotEquals("must be false for different types", new Object(), w1);
 
-        JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1);
+        JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize").with(w1.after + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
 
-        JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1);
+        JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize").with(w1.after).after(w1.after + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1);
 
-        JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1);
+        JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize").with(w1.after).before(w1.before + 1);
         assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
     }
 
     @Test
     public void validWindows() {
-        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .before(anySize)                    // [ -anySize ; anyOtherSize ]
-            .before(0)                          // [ 0 ; anyOtherSize ]
-            .before(-anySize)                   // [ anySize ; anyOtherSize ]
-            .before(-anyOtherSize);             // [ anyOtherSize ; anyOtherSize ]
-
-        JoinWindows.of(anyName, anyOtherSize)   // [ -anyOtherSize ; anyOtherSize ]
-            .after(anySize)                     // [ -anyOtherSize ; anySize ]
-            .after(0)                           // [ -anyOtherSize ; 0 ]
-            .after(-anySize)                    // [ -anyOtherSize ; -anySize ]
-            .after(-anyOtherSize);              // [ -anyOtherSize ; -anyOtherSize ]
+        JoinWindows.of(anyName)
+            .with(anyOtherSize)     // [ -anyOtherSize ; anyOtherSize ]
+            .before(anySize)        // [ -anySize ; anyOtherSize ]
+            .before(0)              // [ 0 ; anyOtherSize ]
+            .before(-anySize)       // [ anySize ; anyOtherSize ]
+            .before(-anyOtherSize); // [ anyOtherSize ; anyOtherSize ]
+
+        JoinWindows.of(anyName)
+            .with(anyOtherSize)     // [ -anyOtherSize ; anyOtherSize ]
+            .after(anySize)         // [ -anyOtherSize ; anySize ]
+            .after(0)               // [ -anyOtherSize ; 0 ]
+            .after(-anySize)        // [ -anyOtherSize ; -anySize ]
+            .after(-anyOtherSize);  // [ -anyOtherSize ; -anyOtherSize ]
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void nameMustNotBeEmpty() {
-        JoinWindows.of("", anySize);
+        JoinWindows.of("").with(anySize);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void nameMustNotBeNull() {
-        JoinWindows.of(null, anySize);
+        JoinWindows.of(null).with(anySize);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void timeDifferenceMustNotBeNegative() {
-        JoinWindows.of(anyName, -1);
+        JoinWindows.of(anyName).with(-1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void afterBelowLower() {
-        JoinWindows.of(anyName, anySize).after(-anySize - 1);
+        JoinWindows.of(anyName).with(anySize).after(-anySize - 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void beforeOverUpper() {
-        JoinWindows.of(anyName, anySize).before(-anySize - 1);
+        JoinWindows.of(anyName).with(anySize).before(-anySize - 1);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 6242702..a40c8fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -109,14 +109,14 @@ public class KStreamImplTest {
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of("join-0").with(anyWindowSize), stringSerde, intSerde, intSerde);
 
         KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of("join-1").with(anyWindowSize), stringSerde, intSerde, intSerde);
 
         stream4.to("topic-5");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index aa7d117..1462999 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -77,7 +77,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
                 intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
@@ -176,7 +176,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+        joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
                 intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
@@ -277,7 +277,7 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+        joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
                 intSerde, stringSerde, stringSerde);
         joined.process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 5b12a30..a91dcb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -79,7 +79,8 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
+                intSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -158,7 +159,8 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
+                intSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();