You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/29 16:44:10 UTC
kafka git commit: KAFKA-3613: Consolidate TumblingWindows and
HoppingWindows into TimeWindows
Repository: kafka
Updated Branches:
refs/heads/trunk 0ada3b1fc -> 68433dcfd
KAFKA-3613: Consolidate TumblingWindows and HoppingWindows into TimeWindows
This PR includes the same code as https://github.com/apache/kafka/pull/1261 but is rebased on latest trunk.
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Matthias J. Sax, Guozhang Wang
Closes #1277 from miguno/KAFKA-3613-v2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68433dcf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68433dcf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68433dcf
Branch: refs/heads/trunk
Commit: 68433dcfdc0ae078ee4e7d278c286a9b7c1b3e76
Parents: 0ada3b1
Author: Michael G. Noll <mi...@confluent.io>
Authored: Fri Apr 29 07:44:03 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 29 07:44:03 2016 -0700
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedDemo.java | 4 +-
.../examples/pageview/PageViewUntypedDemo.java | 4 +-
.../examples/wordcount/WordCountDemo.java | 2 +-
.../kafka/streams/kstream/HoppingWindows.java | 95 ----
.../kafka/streams/kstream/JoinWindows.java | 27 +-
.../kafka/streams/kstream/TimeWindows.java | 125 ++++++
.../kafka/streams/kstream/TumblingWindows.java | 74 ----
.../kafka/streams/kstream/UnlimitedWindows.java | 31 +-
.../apache/kafka/streams/kstream/Window.java | 14 +-
.../apache/kafka/streams/kstream/Windows.java | 6 +-
.../kstream/internals/HoppingWindow.java | 37 --
.../streams/kstream/internals/TimeWindow.java | 33 ++
.../kstream/internals/TumblingWindow.java | 38 --
.../kstream/internals/UnlimitedWindow.java | 8 +-
.../kafka/streams/kstream/TimeWindowsTest.java | 123 ++++++
.../streams/kstream/UnlimitedWindowsTest.java | 80 ++++
.../internals/KStreamWindowAggregateTest.java | 429 ++++++++++---------
.../WindowedStreamPartitionerTest.java | 4 +-
.../streams/kstream/internals/WindowsTest.java | 70 ---
.../streams/smoketest/SmokeTestClient.java | 4 +-
20 files changed, 644 insertions(+), 564 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 4124b32..39ec41f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -24,11 +24,11 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.StreamsConfig;
@@ -160,7 +160,7 @@ public class PageViewTypedDemo {
return new KeyValue<>(viewRegion.region, viewRegion);
}
})
- .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
+ .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e61842f..9a41b9e 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -30,11 +30,11 @@ import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
@@ -99,7 +99,7 @@ public class PageViewUntypedDemo {
return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
}
})
- .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
+ .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
// TODO: we can merge ths toStream().map(...) with a single toStream(...)
.toStream()
.map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5b52803..12395f9 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -69,7 +69,7 @@ public class WordCountDemo {
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
- return new KeyValue<String, String>(value, value);
+ return new KeyValue<>(value, value);
}
})
.countByKey("Counts");
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
deleted file mode 100644
index aa866e4..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.HoppingWindow;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The hopping window specifications used for aggregations.
- */
-public class HoppingWindows extends Windows<HoppingWindow> {
-
- private static final long DEFAULT_SIZE_MS = 1000L;
-
- public final long size;
-
- public final long period;
-
- private HoppingWindows(String name, long size, long period) {
- super(name);
-
- this.size = size;
- this.period = period;
- }
-
- /**
- * Returns a half-interval hopping window definition with the window size in milliseconds
- * of the form [ N * default_size, N * default_size + default_size )
- */
- public static HoppingWindows of(String name) {
- return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS);
- }
-
- /**
- * Returns a new hopping window definition with the original size but reassign the window
- * period in milliseconds of the form [ N * period, N * period + size )
- */
- public HoppingWindows with(long size) {
- return new HoppingWindows(this.name, size, this.period);
- }
-
- /**
- * Returns a new hopping window definition with the original size but reassign the window
- * period in milliseconds of the form [ N * period, N * period + size )
- */
- public HoppingWindows every(long period) {
- return new HoppingWindows(this.name, this.size, period);
- }
-
- @Override
- public Map<Long, HoppingWindow> windowsFor(long timestamp) {
- long enclosed = (size - 1) / period;
-
- long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period);
-
- Map<Long, HoppingWindow> windows = new HashMap<>();
- while (windowStart <= timestamp) {
- // add the window
- HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size);
- windows.put(windowStart, window);
-
- // advance the step period
- windowStart += this.period;
- }
-
- return windows;
- }
-
- @Override
- public boolean equalTo(Windows other) {
- if (!other.getClass().equals(HoppingWindows.class))
- return false;
-
- HoppingWindows otherWindows = (HoppingWindows) other;
-
- return this.size == otherWindows.size && this.period == otherWindows.period;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 24dbdd3..a74984a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -17,15 +17,14 @@
package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.TumblingWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import java.util.Map;
/**
* The window specifications used for joins.
*/
-public class JoinWindows extends Windows<TumblingWindow> {
+public class JoinWindows extends Windows<TimeWindow> {
public final long before;
public final long after;
@@ -74,19 +73,29 @@ public class JoinWindows extends Windows<TumblingWindow> {
}
@Override
- public Map<Long, TumblingWindow> windowsFor(long timestamp) {
+ public Map<Long, TimeWindow> windowsFor(long timestamp) {
// this function should never be called
throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
}
@Override
- public boolean equalTo(Windows other) {
- if (!other.getClass().equals(JoinWindows.class))
+ public final boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof JoinWindows)) {
return false;
+ }
- JoinWindows otherWindows = (JoinWindows) other;
+ JoinWindows other = (JoinWindows) o;
+ return this.before == other.before && this.after == other.after;
+ }
- return this.before == otherWindows.before && this.after == otherWindows.after;
+ @Override
+ public int hashCode() {
+ int result = (int) (before ^ (before >>> 32));
+ result = 31 * result + (int) (after ^ (after >>> 32));
+ return result;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
new file mode 100644
index 0000000..fa3a9d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The time-based window specifications used for aggregations.
+ */
+public class TimeWindows extends Windows<TimeWindow> {
+
+ /**
+ * The size of the window, i.e. how long a window lasts.
+ * The window size's effective time unit is determined by the semantics of the topology's
+ * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+ */
+ public final long size;
+
+ /**
+ * The size of the window's advance interval, i.e. by how much a window moves forward relative
+ * to the previous one. The interval's effective time unit is determined by the semantics of
+ * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+ */
+ public final long advance;
+
+ private TimeWindows(String name, long size, long advance) {
+ super(name);
+ if (size <= 0) {
+ throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")");
+ }
+ this.size = size;
+ if (!(0 < advance && advance <= size)) {
+ throw new IllegalArgumentException(
+ String.format("advance interval (%d) must lie within interval (0, %d]", advance, size));
+ }
+ this.advance = advance;
+ }
+
+ /**
+ * Returns a window definition with the given window size, and with the advance interval being
+ * equal to the window size. Think: [N * size, N * size + size), with N denoting the N-th
+ * window.
+ *
+ * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
+ * non-overlapping windows. Tumbling windows are a specialization of hopping windows.
+ *
+ * @param name The name of the window. Must not be null or empty.
+ * @param size The size of the window, with the requirement that size > 0.
+ * The window size's effective time unit is determined by the semantics of the
+ * topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+ * @return a new window definition
+ */
+ public static TimeWindows of(String name, long size) {
+ return new TimeWindows(name, size, size);
+ }
+
+ /**
+ * Returns a window definition with the original size, but advance ("hop") the window by the given
+ * interval, which specifies by how much a window moves forward relative to the previous one.
+ * Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window.
+ *
+ * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
+ *
+ * @param interval The advance interval ("hop") of the window, with the requirement that
+ * 0 < interval ≤ size. The interval's effective time unit is
+ * determined by the semantics of the topology's configured
+ * {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+ * @return a new window definition
+ */
+ public TimeWindows advanceBy(long interval) {
+ return new TimeWindows(this.name, this.size, interval);
+ }
+
+ @Override
+ public Map<Long, TimeWindow> windowsFor(long timestamp) {
+ long enclosed = (size - 1) / advance;
+ long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);
+
+ Map<Long, TimeWindow> windows = new HashMap<>();
+ while (windowStart <= timestamp) {
+ TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
+ windows.put(windowStart, window);
+ windowStart += this.advance;
+ }
+ return windows;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof TimeWindows)) {
+ return false;
+ }
+ TimeWindows other = (TimeWindows) o;
+ return this.size == other.size && this.advance == other.advance;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (size ^ (size >>> 32));
+ result = 31 * result + (int) (advance ^ (advance >>> 32));
+ return result;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
deleted file mode 100644
index cadedba..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.TumblingWindow;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The tumbling window specifications used for aggregations.
- */
-public class TumblingWindows extends Windows<TumblingWindow> {
-
- private static final long DEFAULT_SIZE_MS = 1000L;
-
- public final long size;
-
- private TumblingWindows(String name, long size) {
- super(name);
-
- this.size = size;
- }
-
- /**
- * Returns a half-interval sliding window definition with the default window size
- */
- public static TumblingWindows of(String name) {
- return new TumblingWindows(name, DEFAULT_SIZE_MS);
- }
-
- /**
- * Returns a half-interval sliding window definition with the window size in milliseconds
- */
- public TumblingWindows with(long size) {
- return new TumblingWindows(this.name, size);
- }
-
- @Override
- public Map<Long, TumblingWindow> windowsFor(long timestamp) {
- long windowStart = timestamp - timestamp % size;
-
- // we cannot use Collections.singleMap since it does not support remove() call
- Map<Long, TumblingWindow> windows = new HashMap<>();
- windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size));
-
- return windows;
- }
-
- @Override
- public boolean equalTo(Windows other) {
- if (!other.getClass().equals(TumblingWindows.class))
- return false;
-
- TumblingWindows otherWindows = (TumblingWindows) other;
-
- return this.size == otherWindows.size;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 7cadfb4..bea3b57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -34,6 +34,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
private UnlimitedWindows(String name, long start) {
super(name);
+ if (start < 0) {
+ throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")");
+ }
this.start = start;
}
@@ -52,21 +55,31 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
// always return the single unlimited window
- // we cannot use Collections.singleMap since it does not support remove() call
+ // we cannot use Collections.singleMap since it does not support remove()
Map<Long, UnlimitedWindow> windows = new HashMap<>();
- windows.put(start, new UnlimitedWindow(start));
-
-
+ if (timestamp >= start) {
+ windows.put(start, new UnlimitedWindow(start));
+ }
return windows;
}
@Override
- public boolean equalTo(Windows other) {
- if (!other.getClass().equals(UnlimitedWindows.class))
+ public final boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (!(o instanceof UnlimitedWindows)) {
return false;
+ }
- UnlimitedWindows otherWindows = (UnlimitedWindows) other;
+ UnlimitedWindows other = (UnlimitedWindows) o;
+ return this.start == other.start;
+ }
- return this.start == otherWindows.start;
+ @Override
+ public int hashCode() {
+ return (int) (start ^ (start >>> 32));
}
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index f2965dc..784d5c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -48,21 +48,18 @@ public abstract class Window {
return this.start() < other.end() || other.start() < this.end();
}
- public boolean equalsTo(Window other) {
- return this.start() == other.start() && this.end() == other.end();
- }
-
@Override
public boolean equals(Object obj) {
- if (obj == this)
+ if (obj == this) {
return true;
+ }
- if (!(obj instanceof Window))
+ if (getClass() != obj.getClass()) {
return false;
+ }
Window other = (Window) obj;
-
- return this.equalsTo(other) && this.start == other.start && this.end == other.end;
+ return this.start == other.start && this.end == other.end;
}
@Override
@@ -70,4 +67,5 @@ public abstract class Window {
long n = (this.start << 32) | this.end;
return (int) (n % 0xFFFFFFFFL);
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index e7dc23e..1406de6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -45,6 +45,9 @@ public abstract class Windows<W extends Window> {
public int segments;
protected Windows(String name) {
+ if (name == null || name.isEmpty()) {
+ throw new IllegalArgumentException("name must not be null or empty");
+ }
this.name = name;
this.segments = DEFAULT_NUM_SEGMENTS;
this.emitDurationMs = DEFAULT_EMIT_DURATION;
@@ -95,7 +98,6 @@ public abstract class Windows<W extends Window> {
return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
}
- public abstract boolean equalTo(Windows other);
-
public abstract Map<Long, W> windowsFor(long timestamp);
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
deleted file mode 100644
index 8b0b2fb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class HoppingWindow extends Window {
-
- public HoppingWindow(long start, long end) {
- super(start, end);
- }
-
- @Override
- public boolean overlap(Window other) {
- return super.overlap(other) && other.getClass().equals(HoppingWindow.class);
- }
-
- @Override
- public boolean equalsTo(Window other) {
- return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
new file mode 100644
index 0000000..5dfb9eb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class TimeWindow extends Window {
+
+ public TimeWindow(long start, long end) {
+ super(start, end);
+ }
+
+ @Override
+ public boolean overlap(Window other) {
+ return getClass() == other.getClass() && super.overlap(other);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
deleted file mode 100644
index a02d4b9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class TumblingWindow extends Window {
-
- public TumblingWindow(long start, long end) {
- super(start, end);
- }
-
- @Override
- public boolean overlap(Window other) {
- return super.overlap(other) && other.getClass().equals(TumblingWindow.class);
- }
-
- @Override
- public boolean equalsTo(Window other) {
- return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index 8ac8f70..4b93f9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -27,11 +27,7 @@ public class UnlimitedWindow extends Window {
@Override
public boolean overlap(Window other) {
- return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class);
+ return getClass() == other.getClass() && super.overlap(other);
}
- @Override
- public boolean equalsTo(Window other) {
- return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class);
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
new file mode 100644
index 0000000..e9ff235
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimeWindowsTest {
+
+ private static String anyName = "window";
+ private static long anySize = 123L;
+
+ @Test
+ public void shouldHaveSaneEqualsAndHashCode() {
+ TimeWindows w1 = TimeWindows.of("w1", anySize);
+ TimeWindows w2 = TimeWindows.of("w2", w1.size);
+
+ // Reflexive
+ assertTrue(w1.equals(w1));
+ assertTrue(w1.hashCode() == w1.hashCode());
+
+ // Symmetric
+ assertTrue(w1.equals(w2));
+ assertTrue(w1.hashCode() == w2.hashCode());
+ assertTrue(w2.hashCode() == w1.hashCode());
+
+ // Transitive
+ TimeWindows w3 = TimeWindows.of("w3", w2.size);
+ assertTrue(w2.equals(w3));
+ assertTrue(w2.hashCode() == w3.hashCode());
+ assertTrue(w1.equals(w3));
+ assertTrue(w1.hashCode() == w3.hashCode());
+
+ // Inequality scenarios
+ assertFalse("must be false for null", w1.equals(null));
+ assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant")));
+ assertFalse("must be false for different types", w1.equals(new Object()));
+
+ TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1);
+ assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize));
+
+ TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
+ assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeEmpty() {
+ TimeWindows.of("", anySize);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeNull() {
+ TimeWindows.of(null, anySize);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void windowSizeMustNotBeNegative() {
+ TimeWindows.of(anyName, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void windowSizeMustNotBeZero() {
+ TimeWindows.of(anyName, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void advanceIntervalMustNotBeNegative() {
+ TimeWindows.of(anyName, anySize).advanceBy(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void advanceIntervalMustNotBeZero() {
+ TimeWindows.of(anyName, anySize).advanceBy(0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void advanceIntervalMustNotBeLargerThanWindowSize() {
+ long size = anySize;
+ TimeWindows.of(anyName, size).advanceBy(size + 1);
+ }
+
+ @Test
+ public void windowsForHoppingWindows() {
+ TimeWindows windows = TimeWindows.of(anyName, 12L).advanceBy(5L);
+ Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+ assertEquals(12L / 5L + 1, matched.size());
+ assertEquals(new TimeWindow(10L, 22L), matched.get(10L));
+ assertEquals(new TimeWindow(15L, 27L), matched.get(15L));
+ assertEquals(new TimeWindow(20L, 32L), matched.get(20L));
+ }
+
+ @Test
+ public void windowsForTumblingWindows() {
+ TimeWindows windows = TimeWindows.of(anyName, 12L);
+ Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+ assertEquals(1, matched.size());
+ assertEquals(new TimeWindow(12L, 24L), matched.get(12L));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
new file mode 100644
index 0000000..da5f159
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class UnlimitedWindowsTest {
+
+ private static String anyName = "window";
+ private static long anyStartTime = 10L;
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeEmpty() {
+ UnlimitedWindows.of("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void nameMustNotBeNull() {
+ UnlimitedWindows.of(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void startTimeMustNotBeNegative() {
+ UnlimitedWindows.of(anyName).startOn(-1);
+ }
+
+ @Test
+ public void startTimeCanBeZero() {
+ UnlimitedWindows.of(anyName).startOn(0);
+ }
+
+ @Test
+ public void shouldIncludeRecordsThatHappenedOnWindowStart() {
+ UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+ Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start);
+ assertEquals(1, matchedWindows.size());
+ assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
+ }
+
+ @Test
+ public void shouldIncludeRecordsThatHappenedAfterWindowStart() {
+ UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+ long timestamp = w.start + 1;
+ Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
+ assertEquals(1, matchedWindows.size());
+ assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
+ }
+
+ @Test
+ public void shouldExcludeRecordsThatHappenedBeforeWindowStart() {
+ UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+ long timestamp = w.start - 1;
+ Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
+ assertTrue(matchedWindows.isEmpty());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 828103a..f4fe3a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -20,10 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.KStreamTestDriver;
@@ -37,6 +37,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
@@ -62,215 +63,229 @@ public class KStreamWindowAggregateTest {
@Test
public void testAggBasic() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
- HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
- driver = new KStreamTestDriver(builder, stateDir);
-
- driver.setTime(0L);
- driver.process(topic1, "A", "1");
- driver.setTime(1L);
- driver.process(topic1, "B", "2");
- driver.setTime(2L);
- driver.process(topic1, "C", "3");
- driver.setTime(3L);
- driver.process(topic1, "D", "4");
- driver.setTime(4L);
- driver.process(topic1, "A", "1");
-
- driver.setTime(5L);
- driver.process(topic1, "A", "1");
- driver.setTime(6L);
- driver.process(topic1, "B", "2");
- driver.setTime(7L);
- driver.process(topic1, "D", "4");
- driver.setTime(8L);
- driver.process(topic1, "B", "2");
- driver.setTime(9L);
- driver.process(topic1, "C", "3");
-
- driver.setTime(10L);
- driver.process(topic1, "A", "1");
- driver.setTime(11L);
- driver.process(topic1, "B", "2");
- driver.setTime(12L);
- driver.process(topic1, "D", "4");
- driver.setTime(13L);
- driver.process(topic1, "B", "2");
- driver.setTime(14L);
- driver.process(topic1, "C", "3");
-
- assertEquals(Utils.mkList(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1",
-
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3",
-
- "[A@5]:0+1+1", "[A@10]:0+1",
- "[B@5]:0+2+2+2", "[B@10]:0+2",
- "[D@5]:0+4+4", "[D@10]:0+4",
- "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
- "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+ final File baseDir = Files.createTempDirectory("test").toFile();
+
+ try {
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table2 =
+ stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+ driver.setTime(0L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(1L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(2L);
+ driver.process(topic1, "C", "3");
+ driver.setTime(3L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(4L);
+ driver.process(topic1, "A", "1");
+
+ driver.setTime(5L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(6L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(7L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(8L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(9L);
+ driver.process(topic1, "C", "3");
+
+ driver.setTime(10L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(11L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(12L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(13L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(14L);
+ driver.process(topic1, "C", "3");
+
+ assertEquals(Utils.mkList(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1",
+
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3",
+
+ "[A@5]:0+1+1", "[A@10]:0+1",
+ "[B@5]:0+2+2+2", "[B@10]:0+2",
+ "[D@5]:0+4+4", "[D@10]:0+4",
+ "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+ "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+
+ } finally {
+ Utils.delete(baseDir);
+ }
}
@Test
public void testJoin() throws Exception {
- final KStreamBuilder builder = new KStreamBuilder();
-
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
- KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
- HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
- table1.toStream().process(proc1);
-
- KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
- KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
- HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
- strSerde,
- strSerde);
-
- MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
- table2.toStream().process(proc2);
-
-
- MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
- table1.join(table2, new ValueJoiner<String, String, String>() {
- @Override
- public String apply(String p1, String p2) {
- return p1 + "%" + p2;
- }
- }).toStream().process(proc3);
-
- driver = new KStreamTestDriver(builder, stateDir);
-
- driver.setTime(0L);
- driver.process(topic1, "A", "1");
- driver.setTime(1L);
- driver.process(topic1, "B", "2");
- driver.setTime(2L);
- driver.process(topic1, "C", "3");
- driver.setTime(3L);
- driver.process(topic1, "D", "4");
- driver.setTime(4L);
- driver.process(topic1, "A", "1");
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1",
- "[B@0]:0+2",
- "[C@0]:0+3",
- "[D@0]:0+4",
- "[A@0]:0+1+1"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult(
- "[A@0]:null",
- "[B@0]:null",
- "[C@0]:null",
- "[D@0]:null",
- "[A@0]:null"
- );
-
- driver.setTime(5L);
- driver.process(topic1, "A", "1");
- driver.setTime(6L);
- driver.process(topic1, "B", "2");
- driver.setTime(7L);
- driver.process(topic1, "D", "4");
- driver.setTime(8L);
- driver.process(topic1, "B", "2");
- driver.setTime(9L);
- driver.process(topic1, "C", "3");
-
- proc1.checkAndClearProcessResult(
- "[A@0]:0+1+1+1", "[A@5]:0+1",
- "[B@0]:0+2+2", "[B@5]:0+2",
- "[D@0]:0+4+4", "[D@5]:0+4",
- "[B@0]:0+2+2+2", "[B@5]:0+2+2",
- "[C@0]:0+3+3", "[C@5]:0+3"
- );
- proc2.checkAndClearProcessResult();
- proc3.checkAndClearProcessResult(
- "[A@0]:null", "[A@5]:null",
- "[B@0]:null", "[B@5]:null",
- "[D@0]:null", "[D@5]:null",
- "[B@0]:null", "[B@5]:null",
- "[C@0]:null", "[C@5]:null"
- );
-
- driver.setTime(0L);
- driver.process(topic2, "A", "a");
- driver.setTime(1L);
- driver.process(topic2, "B", "b");
- driver.setTime(2L);
- driver.process(topic2, "C", "c");
- driver.setTime(3L);
- driver.process(topic2, "D", "d");
- driver.setTime(4L);
- driver.process(topic2, "A", "a");
-
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a",
- "[B@0]:0+b",
- "[C@0]:0+c",
- "[D@0]:0+d",
- "[A@0]:0+a+a"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a",
- "[B@0]:0+2+2+2%0+b",
- "[C@0]:0+3+3%0+c",
- "[D@0]:0+4+4%0+d",
- "[A@0]:0+1+1+1%0+a+a");
-
- driver.setTime(5L);
- driver.process(topic2, "A", "a");
- driver.setTime(6L);
- driver.process(topic2, "B", "b");
- driver.setTime(7L);
- driver.process(topic2, "D", "d");
- driver.setTime(8L);
- driver.process(topic2, "B", "b");
- driver.setTime(9L);
- driver.process(topic2, "C", "c");
-
- proc1.checkAndClearProcessResult();
- proc2.checkAndClearProcessResult(
- "[A@0]:0+a+a+a", "[A@5]:0+a",
- "[B@0]:0+b+b", "[B@5]:0+b",
- "[D@0]:0+d+d", "[D@5]:0+d",
- "[B@0]:0+b+b+b", "[B@5]:0+b+b",
- "[C@0]:0+c+c", "[C@5]:0+c"
- );
- proc3.checkAndClearProcessResult(
- "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
- "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
- "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
- "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
- "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
- );
+ final File baseDir = Files.createTempDirectory("test").toFile();
+
+ try {
+ final KStreamBuilder builder = new KStreamBuilder();
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+ KTable<Windowed<String>, String> table1 =
+ stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+ table1.toStream().process(proc1);
+
+ KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+ KTable<Windowed<String>, String> table2 =
+ stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+ TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
+ strSerde,
+ strSerde);
+
+ MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+ table2.toStream().process(proc2);
+
+
+ MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+ table1.join(table2, new ValueJoiner<String, String, String>() {
+ @Override
+ public String apply(String p1, String p2) {
+ return p1 + "%" + p2;
+ }
+ }).toStream().process(proc3);
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+ driver.setTime(0L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(1L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(2L);
+ driver.process(topic1, "C", "3");
+ driver.setTime(3L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(4L);
+ driver.process(topic1, "A", "1");
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1",
+ "[B@0]:0+2",
+ "[C@0]:0+3",
+ "[D@0]:0+4",
+ "[A@0]:0+1+1"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
+ "[A@0]:null",
+ "[B@0]:null",
+ "[C@0]:null",
+ "[D@0]:null",
+ "[A@0]:null"
+ );
+
+ driver.setTime(5L);
+ driver.process(topic1, "A", "1");
+ driver.setTime(6L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(7L);
+ driver.process(topic1, "D", "4");
+ driver.setTime(8L);
+ driver.process(topic1, "B", "2");
+ driver.setTime(9L);
+ driver.process(topic1, "C", "3");
+
+ proc1.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1", "[A@5]:0+1",
+ "[B@0]:0+2+2", "[B@5]:0+2",
+ "[D@0]:0+4+4", "[D@5]:0+4",
+ "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+ "[C@0]:0+3+3", "[C@5]:0+3"
+ );
+ proc2.checkAndClearProcessResult();
+ proc3.checkAndClearProcessResult(
+ "[A@0]:null", "[A@5]:null",
+ "[B@0]:null", "[B@5]:null",
+ "[D@0]:null", "[D@5]:null",
+ "[B@0]:null", "[B@5]:null",
+ "[C@0]:null", "[C@5]:null"
+ );
+
+ driver.setTime(0L);
+ driver.process(topic2, "A", "a");
+ driver.setTime(1L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(2L);
+ driver.process(topic2, "C", "c");
+ driver.setTime(3L);
+ driver.process(topic2, "D", "d");
+ driver.setTime(4L);
+ driver.process(topic2, "A", "a");
+
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a",
+ "[B@0]:0+b",
+ "[C@0]:0+c",
+ "[D@0]:0+d",
+ "[A@0]:0+a+a"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a",
+ "[B@0]:0+2+2+2%0+b",
+ "[C@0]:0+3+3%0+c",
+ "[D@0]:0+4+4%0+d",
+ "[A@0]:0+1+1+1%0+a+a");
+
+ driver.setTime(5L);
+ driver.process(topic2, "A", "a");
+ driver.setTime(6L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(7L);
+ driver.process(topic2, "D", "d");
+ driver.setTime(8L);
+ driver.process(topic2, "B", "b");
+ driver.setTime(9L);
+ driver.process(topic2, "C", "c");
+
+ proc1.checkAndClearProcessResult();
+ proc2.checkAndClearProcessResult(
+ "[A@0]:0+a+a+a", "[A@5]:0+a",
+ "[B@0]:0+b+b", "[B@5]:0+b",
+ "[D@0]:0+d+d", "[D@5]:0+d",
+ "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+ "[C@0]:0+c+c", "[C@5]:0+c"
+ );
+ proc3.checkAndClearProcessResult(
+ "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+ "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+ "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+ "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+ "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+ );
+ } finally {
+ Utils.delete(baseDir);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 7c6d5ec..b31b20d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -49,7 +49,7 @@ public class WindowedStreamPartitionerTest {
new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
);
- private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+ private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
@Test
public void testCopartitioning() {
@@ -71,7 +71,7 @@ public class WindowedStreamPartitionerTest {
Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
for (int w = 0; w < 10; w++) {
- HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+ TimeWindow window = new TimeWindow(10 * w, 20 * w);
Windowed<Integer> windowedKey = new Windowed<>(key, window);
Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
deleted file mode 100644
index f9b6ba5..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.TumblingWindows;
-import org.apache.kafka.streams.kstream.UnlimitedWindows;
-import org.junit.Test;
-
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowsTest {
-
- @Test
- public void hoppingWindows() {
-
- HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L);
-
- Map<Long, HoppingWindow> matched = windows.windowsFor(21L);
-
- assertEquals(3, matched.size());
-
- assertEquals(new HoppingWindow(10L, 22L), matched.get(10L));
- assertEquals(new HoppingWindow(15L, 27L), matched.get(15L));
- assertEquals(new HoppingWindow(20L, 32L), matched.get(20L));
- }
-
- @Test
- public void tumblineWindows() {
-
- TumblingWindows windows = TumblingWindows.of("test").with(12L);
-
- Map<Long, TumblingWindow> matched = windows.windowsFor(21L);
-
- assertEquals(1, matched.size());
-
- assertEquals(new TumblingWindow(12L, 24L), matched.get(12L));
- }
-
- @Test
- public void unlimitedWindows() {
-
- UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L);
-
- Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L);
-
- assertEquals(1, matched.size());
-
- assertEquals(new UnlimitedWindow(10L), matched.get(10L));
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 95e0fbf..733c1ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.Windowed;
@@ -207,7 +207,7 @@ public class SmokeTestClient extends SmokeTestUtil {
// windowed count
data.countByKey(
- TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
+ TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE),
stringSerde
).toStream().map(
new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {