You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/23 21:16:45 UTC
kafka git commit: Kafka-3880: Disallow Join Window with size zero
Repository: kafka
Updated Branches:
refs/heads/trunk 36cab7dbd -> 8ec4e4b7a
Kafka-3880: Disallow Join Window with size zero
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Michael G. Noll, Damian Guy, Eno Thereska, Guozhang Wang
Closes #1529 from mjsax/kafka-3880-join-windows
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ec4e4b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ec4e4b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ec4e4b7
Branch: refs/heads/trunk
Commit: 8ec4e4b7a37164a1189a74815decb1d6f6410963
Parents: 36cab7d
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 23 14:16:42 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Jun 23 14:16:42 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/kstream/JoinWindows.java | 20 ++--
.../integration/KStreamRepartitionJoinTest.java | 11 +--
.../kafka/streams/kstream/JoinWindowsTest.java | 99 ++++++++++++++++++++
.../kafka/streams/kstream/TimeWindowsTest.java | 30 +++---
.../kstream/internals/KStreamImplTest.java | 5 +-
.../internals/KStreamKStreamJoinTest.java | 9 +-
.../internals/KStreamKStreamLeftJoinTest.java | 8 +-
7 files changed, 145 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index f45c064..53ddf3e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -41,6 +41,8 @@ import java.util.Map;
* </ul>
* A join is symmetric in the sense, that a join specification on the first stream returns the same result record as
* a join specification on the second stream with flipped before and after values.
+ * <p>
+ * Both values (before and after) must not be negative and not zero at the same time.
*/
public class JoinWindows extends Windows<TimeWindow> {
@@ -52,21 +54,27 @@ public class JoinWindows extends Windows<TimeWindow> {
private JoinWindows(String name, long before, long after) {
super(name);
+ if (before < 0) {
+ throw new IllegalArgumentException("window size must be > 0 (you provided before as " + before + ")");
+ }
+ if (after < 0) {
+ throw new IllegalArgumentException("window size must be > 0 (you provided after as " + after + ")");
+ }
+ if (before == 0 && after == 0) {
+ throw new IllegalArgumentException("window size must be > 0 (you provided 0)");
+ }
+
this.after = after;
this.before = before;
}
- public static JoinWindows of(String name) {
- return new JoinWindows(name, 0L, 0L);
- }
-
/**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
*
* @param timeDifference join window interval
*/
- public JoinWindows within(long timeDifference) {
- return new JoinWindows(this.name, timeDifference, timeDifference);
+ public static JoinWindows of(String name, long timeDifference) {
+ return new JoinWindows(name, timeDifference, timeDifference);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index caf326d..c852513 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -226,7 +226,7 @@ public class KStreamRepartitionJoinTest {
streamTwo
.join(streamOne.map(keyMapper),
joiner,
- JoinWindows.of(output).within(60 * 1000),
+ JoinWindows.of(output, 60 * 1000),
Serdes.Integer(),
Serdes.String(),
Serdes.Integer())
@@ -253,7 +253,7 @@ public class KStreamRepartitionJoinTest {
String outputTopic = "left-join";
map1.leftJoin(map2,
valueJoiner,
- JoinWindows.of("the-left-join").within(60 * 1000),
+ JoinWindows.of("the-left-join", 60 * 1000),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String())
@@ -282,8 +282,7 @@ public class KStreamRepartitionJoinTest {
final KStream<Integer, String> join = map1.join(map2,
valueJoiner,
- JoinWindows.of("join-one")
- .within(60 * 1000),
+ JoinWindows.of("join-one", 60 * 1000),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String());
@@ -298,7 +297,7 @@ public class KStreamRepartitionJoinTest {
join.map(kvMapper)
.join(streamFour.map(kvMapper),
joiner,
- JoinWindows.of("the-other-join").within(60 * 1000),
+ JoinWindows.of("the-other-join", 60 * 1000),
Serdes.Integer(),
Serdes.String(),
Serdes.String())
@@ -433,7 +432,7 @@ public class KStreamRepartitionJoinTest {
CLUSTER.createTopic(outputTopic);
lhs.join(rhs,
valueJoiner,
- JoinWindows.of(joinName).within(60 * 1000),
+ JoinWindows.of(joinName, 60 * 1000),
Serdes.Integer(),
Serdes.Integer(),
Serdes.String())
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
new file mode 100644
index 0000000..d8fa7b4
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+
+public class JoinWindowsTest {
+
+ private static String anyName = "window";
+ private static long anySize = 123L;
+ private static long anyOtherSize = 456L;
+
+ @Test
+ public void shouldHaveSaneEqualsAndHashCode() {
+ JoinWindows w1 = JoinWindows.of("w1", anySize);
+ JoinWindows w2 = JoinWindows.of("w2", anySize);
+
+ // Reflexive
+ assertEquals(w1, w1);
+ assertEquals(w1.hashCode(), w1.hashCode());
+
+ // Symmetric
+ assertEquals(w1, w2);
+ assertEquals(w2, w1);
+ assertEquals(w1.hashCode(), w2.hashCode());
+
+ JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize);
+ JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after);
+ assertEquals(w3, w4);
+ assertEquals(w4, w3);
+ assertEquals(w3.hashCode(), w4.hashCode());
+
+ // Inequality scenarios
+ assertNotEquals("must be false for null", null, w1);
+ assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
+ assertNotEquals("must be false for different types", new Object(), w1);
+
+ JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1);
+ assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
+
+ JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1);
+ assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1);
+
+ JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1);
+ assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeEmpty() {
+ JoinWindows.of("", anySize);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeNull() {
+ JoinWindows.of(null, anySize);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void windowSizeMustNotBeNegative() {
+ JoinWindows.of(anyName, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void beforeMustNotBeNegative() {
+ JoinWindows.of(anyName, anySize).before(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void afterSizeMustNotBeNegative() {
+ JoinWindows.of(anyName, anySize).after(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void windowSizeMustNotBeZero() {
+ JoinWindows.of(anyName, 0);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 62b12a9..5acd6e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -25,8 +25,7 @@ import org.junit.Test;
import java.util.Map;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
public class TimeWindowsTest {
@@ -39,31 +38,30 @@ public class TimeWindowsTest {
TimeWindows w2 = TimeWindows.of("w2", w1.size);
// Reflexive
- assertTrue(w1.equals(w1));
- assertTrue(w1.hashCode() == w1.hashCode());
+ assertEquals(w1, w1);
+ assertEquals(w1.hashCode(), w1.hashCode());
// Symmetric
- assertTrue(w1.equals(w2));
- assertTrue(w1.hashCode() == w2.hashCode());
- assertTrue(w2.hashCode() == w1.hashCode());
+ assertEquals(w1, w2);
+ assertEquals(w2, w1);
+ assertEquals(w1.hashCode(), w2.hashCode());
// Transitive
TimeWindows w3 = TimeWindows.of("w3", w2.size);
- assertTrue(w2.equals(w3));
- assertTrue(w2.hashCode() == w3.hashCode());
- assertTrue(w1.equals(w3));
- assertTrue(w1.hashCode() == w3.hashCode());
+ assertEquals(w2, w3);
+ assertEquals(w1, w3);
+ assertEquals(w1.hashCode(), w3.hashCode());
// Inequality scenarios
- assertFalse("must be false for null", w1.equals(null));
- assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant")));
- assertFalse("must be false for different types", w1.equals(new Object()));
+ assertNotEquals("must be false for null", null, w1);
+ assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
+ assertNotEquals("must be false for different types", new Object(), w1);
TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1);
- assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize));
+ assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
- assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval));
+ assertNotEquals("must be false when advance intervals are different", differentAdvanceInterval, w1);
}
@Test(expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3d45d1d..6242702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -103,19 +103,20 @@ public class KStreamImplTest {
}
);
+ final int anyWindowSize = 1;
KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde);
+ }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde);
KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde);
+ }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde);
stream4.to("topic-5");
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 6b0828a..aa7d117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -77,7 +77,8 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -175,7 +176,8 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -275,7 +277,8 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
http://git-wip-us.apache.org/repos/asf/kafka/blob/8ec4e4b7/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 1a608a7..8e05da9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -78,8 +78,8 @@ public class KStreamKStreamLeftJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
- .within(100), intSerde, stringSerde, stringSerde);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -157,8 +157,8 @@ public class KStreamKStreamLeftJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
- .within(100), intSerde, stringSerde, stringSerde);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();