You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/30 18:07:31 UTC
[3/3] kafka git commit: revertd API breaking changes
revertd API breaking changes
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4091a13c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4091a13c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4091a13c
Branch: refs/heads/0.10.0
Commit: 4091a13c5a3ef4805590357ab872333798a1bbe8
Parents: 6ba0aaa
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Jun 30 15:07:00 2016 +0200
Committer: Matthias J. Sax <ma...@confluent.io>
Committed: Thu Jun 30 15:35:08 2016 +0200
----------------------------------------------------------------------
.../kafka/streams/kstream/JoinWindows.java | 16 +++++--
.../kafka/streams/kstream/JoinWindowsTest.java | 48 ++++++++++----------
.../kstream/internals/KStreamImplTest.java | 4 +-
.../internals/KStreamKStreamJoinTest.java | 6 +--
.../internals/KStreamKStreamLeftJoinTest.java | 6 ++-
5 files changed, 46 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 309a9e6..50c0453 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -64,13 +64,21 @@ public class JoinWindows extends Windows<TimeWindow> {
}
/**
+ * Specifies that records of the same key are joinable if their timestamps are equal.
+ *
+ * @param name The name of the window. Must not be null or empty.
+ */
+ public static JoinWindows of(String name) {
+ return new JoinWindows(name, 0L, 0L);
+ }
+
+ /**
* Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}.
- * ({@code timeDifference} must not be negative)
*
- * @param timeDifference join window interval
+ * @param timeDifference join window interval (must not be negative)
*/
- public static JoinWindows of(String name, long timeDifference) {
- return new JoinWindows(name, timeDifference, timeDifference);
+ public JoinWindows with(long timeDifference) {
+ return new JoinWindows(this.name, timeDifference, timeDifference);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 20efd45..4e14777 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -33,8 +33,8 @@ public class JoinWindowsTest {
@Test
public void shouldHaveSaneEqualsAndHashCode() {
- JoinWindows w1 = JoinWindows.of("w1", anySize);
- JoinWindows w2 = JoinWindows.of("w2", anySize);
+ JoinWindows w1 = JoinWindows.of("w1").with(anySize);
+ JoinWindows w2 = JoinWindows.of("w2").with(anySize);
// Reflexive
assertEquals(w1, w1);
@@ -45,8 +45,8 @@ public class JoinWindowsTest {
assertEquals(w2, w1);
assertEquals(w1.hashCode(), w2.hashCode());
- JoinWindows w3 = JoinWindows.of("w3", w2.after).before(anyOtherSize);
- JoinWindows w4 = JoinWindows.of("w4", anyOtherSize).after(w2.after);
+ JoinWindows w3 = JoinWindows.of("w3").with(w2.after).before(anyOtherSize);
+ JoinWindows w4 = JoinWindows.of("w4").with(anyOtherSize).after(w2.after);
assertEquals(w3, w4);
assertEquals(w4, w3);
assertEquals(w3.hashCode(), w4.hashCode());
@@ -56,54 +56,56 @@ public class JoinWindowsTest {
assertNotEquals("must be false for different window types", UnlimitedWindows.of("irrelevant"), w1);
assertNotEquals("must be false for different types", new Object(), w1);
- JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize", w1.after + 1);
+ JoinWindows differentWindowSize = JoinWindows.of("differentWindowSize").with(w1.after + 1);
assertNotEquals("must be false when window sizes are different", differentWindowSize, w1);
- JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize", w1.after).after(w1.after + 1);
+ JoinWindows differentWindowSize2 = JoinWindows.of("differentWindowSize").with(w1.after).after(w1.after + 1);
assertNotEquals("must be false when window sizes are different", differentWindowSize2, w1);
- JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize", w1.after).before(w1.before + 1);
+ JoinWindows differentWindowSize3 = JoinWindows.of("differentWindowSize").with(w1.after).before(w1.before + 1);
assertNotEquals("must be false when window sizes are different", differentWindowSize3, w1);
}
@Test
public void validWindows() {
- JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ]
- .before(anySize) // [ -anySize ; anyOtherSize ]
- .before(0) // [ 0 ; anyOtherSize ]
- .before(-anySize) // [ anySize ; anyOtherSize ]
- .before(-anyOtherSize); // [ anyOtherSize ; anyOtherSize ]
-
- JoinWindows.of(anyName, anyOtherSize) // [ -anyOtherSize ; anyOtherSize ]
- .after(anySize) // [ -anyOtherSize ; anySize ]
- .after(0) // [ -anyOtherSize ; 0 ]
- .after(-anySize) // [ -anyOtherSize ; -anySize ]
- .after(-anyOtherSize); // [ -anyOtherSize ; -anyOtherSize ]
+ JoinWindows.of(anyName)
+ .with(anyOtherSize) // [ -anyOtherSize ; anyOtherSize ]
+ .before(anySize) // [ -anySize ; anyOtherSize ]
+ .before(0) // [ 0 ; anyOtherSize ]
+ .before(-anySize) // [ anySize ; anyOtherSize ]
+ .before(-anyOtherSize); // [ anyOtherSize ; anyOtherSize ]
+
+ JoinWindows.of(anyName)
+ .with(anyOtherSize) // [ -anyOtherSize ; anyOtherSize ]
+ .after(anySize) // [ -anyOtherSize ; anySize ]
+ .after(0) // [ -anyOtherSize ; 0 ]
+ .after(-anySize) // [ -anyOtherSize ; -anySize ]
+ .after(-anyOtherSize); // [ -anyOtherSize ; -anyOtherSize ]
}
@Test(expected = IllegalArgumentException.class)
public void nameMustNotBeEmpty() {
- JoinWindows.of("", anySize);
+ JoinWindows.of("").with(anySize);
}
@Test(expected = IllegalArgumentException.class)
public void nameMustNotBeNull() {
- JoinWindows.of(null, anySize);
+ JoinWindows.of(null).with(anySize);
}
@Test(expected = IllegalArgumentException.class)
public void timeDifferenceMustNotBeNegative() {
- JoinWindows.of(anyName, -1);
+ JoinWindows.of(anyName).with(-1);
}
@Test(expected = IllegalArgumentException.class)
public void afterBelowLower() {
- JoinWindows.of(anyName, anySize).after(-anySize - 1);
+ JoinWindows.of(anyName).with(anySize).after(-anySize - 1);
}
@Test(expected = IllegalArgumentException.class)
public void beforeOverUpper() {
- JoinWindows.of(anyName, anySize).before(-anySize - 1);
+ JoinWindows.of(anyName).with(anySize).before(-anySize - 1);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 6242702..a40c8fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -109,14 +109,14 @@ public class KStreamImplTest {
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-0", anyWindowSize), stringSerde, intSerde, intSerde);
+ }, JoinWindows.of("join-0").with(anyWindowSize), stringSerde, intSerde, intSerde);
KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-1", anyWindowSize), stringSerde, intSerde, intSerde);
+ }, JoinWindows.of("join-1").with(anyWindowSize), stringSerde, intSerde, intSerde);
stream4.to("topic-5");
http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index aa7d117..1462999 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -77,7 +77,7 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
intSerde, stringSerde, stringSerde);
joined.process(processor);
@@ -176,7 +176,7 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ joined = stream1.outerJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
intSerde, stringSerde, stringSerde);
joined.process(processor);
@@ -277,7 +277,7 @@ public class KStreamKStreamJoinTest {
processor = new MockProcessorSupplier<>();
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100),
+ joined = stream1.join(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
intSerde, stringSerde, stringSerde);
joined.process(processor);
http://git-wip-us.apache.org/repos/asf/kafka/blob/4091a13c/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 5b12a30..a91dcb2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -79,7 +79,8 @@ public class KStreamKStreamLeftJoinTest {
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), intSerde, stringSerde);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
+ intSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -158,7 +159,8 @@ public class KStreamKStreamLeftJoinTest {
stream1 = builder.stream(intSerde, stringSerde, topic1);
stream2 = builder.stream(intSerde, stringSerde, topic2);
- joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test", 100), intSerde, stringSerde);
+ joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").with(100),
+ intSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();