You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/30 00:05:53 UTC

[39/50] [abbrv] kafka git commit: KAFKA-3613: Consolidate TumblingWindows and HoppingWindows into TimeWindows

KAFKA-3613: Consolidate TumblingWindows and HoppingWindows into TimeWindows

This PR includes the same code as https://github.com/apache/kafka/pull/1261 but is rebased on latest trunk.

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #1277 from miguno/KAFKA-3613-v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68433dcf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68433dcf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68433dcf

Branch: refs/heads/0.10.0
Commit: 68433dcfdc0ae078ee4e7d278c286a9b7c1b3e76
Parents: 0ada3b1
Author: Michael G. Noll <mi...@confluent.io>
Authored: Fri Apr 29 07:44:03 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Apr 29 07:44:03 2016 -0700

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedDemo.java    |   4 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   4 +-
 .../examples/wordcount/WordCountDemo.java       |   2 +-
 .../kafka/streams/kstream/HoppingWindows.java   |  95 ----
 .../kafka/streams/kstream/JoinWindows.java      |  27 +-
 .../kafka/streams/kstream/TimeWindows.java      | 125 ++++++
 .../kafka/streams/kstream/TumblingWindows.java  |  74 ----
 .../kafka/streams/kstream/UnlimitedWindows.java |  31 +-
 .../apache/kafka/streams/kstream/Window.java    |  14 +-
 .../apache/kafka/streams/kstream/Windows.java   |   6 +-
 .../kstream/internals/HoppingWindow.java        |  37 --
 .../streams/kstream/internals/TimeWindow.java   |  33 ++
 .../kstream/internals/TumblingWindow.java       |  38 --
 .../kstream/internals/UnlimitedWindow.java      |   8 +-
 .../kafka/streams/kstream/TimeWindowsTest.java  | 123 ++++++
 .../streams/kstream/UnlimitedWindowsTest.java   |  80 ++++
 .../internals/KStreamWindowAggregateTest.java   | 429 ++++++++++---------
 .../WindowedStreamPartitionerTest.java          |   4 +-
 .../streams/kstream/internals/WindowsTest.java  |  70 ---
 .../streams/smoketest/SmokeTestClient.java      |   4 +-
 20 files changed, 644 insertions(+), 564 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 4124b32..39ec41f 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -24,11 +24,11 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.StreamsConfig;
@@ -160,7 +160,7 @@ public class PageViewTypedDemo {
                         return new KeyValue<>(viewRegion.region, viewRegion);
                     }
                 })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
+                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index e61842f..9a41b9e 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -30,11 +30,11 @@ import org.apache.kafka.connect.json.JsonDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -99,7 +99,7 @@ public class PageViewUntypedDemo {
                         return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion);
                     }
                 })
-                .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String())
+                .countByKey(TimeWindows.of("GeoPageViewsWindow", 7 * 24 * 60 * 60 * 1000L).advanceBy(1000), Serdes.String())
                 // TODO: we can merge ths toStream().map(...) with a single toStream(...)
                 .toStream()
                 .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index 5b52803..12395f9 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -69,7 +69,7 @@ public class WordCountDemo {
                 }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
                     @Override
                     public KeyValue<String, String> apply(String key, String value) {
-                        return new KeyValue<String, String>(value, value);
+                        return new KeyValue<>(value, value);
                     }
                 })
                 .countByKey("Counts");

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
deleted file mode 100644
index aa866e4..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/HoppingWindows.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.HoppingWindow;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The hopping window specifications used for aggregations.
- */
-public class HoppingWindows extends Windows<HoppingWindow> {
-
-    private static final long DEFAULT_SIZE_MS = 1000L;
-
-    public final long size;
-
-    public final long period;
-
-    private HoppingWindows(String name, long size, long period) {
-        super(name);
-
-        this.size = size;
-        this.period = period;
-    }
-
-    /**
-     * Returns a half-interval hopping window definition with the window size in milliseconds
-     * of the form &#91; N &#42; default_size, N &#42; default_size + default_size &#41;
-     */
-    public static HoppingWindows of(String name) {
-        return new HoppingWindows(name, DEFAULT_SIZE_MS, DEFAULT_SIZE_MS);
-    }
-
-    /**
-     * Returns a new hopping window definition with the original size but reassign the window
-     * period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
-     */
-    public HoppingWindows with(long size) {
-        return new HoppingWindows(this.name, size, this.period);
-    }
-
-    /**
-     * Returns a new hopping window definition with the original size but reassign the window
-     * period in milliseconds of the form &#91; N &#42; period, N &#42; period + size &#41;
-     */
-    public HoppingWindows every(long period) {
-        return new HoppingWindows(this.name, this.size, period);
-    }
-
-    @Override
-    public Map<Long, HoppingWindow> windowsFor(long timestamp) {
-        long enclosed = (size - 1) / period;
-
-        long windowStart = Math.max(0, timestamp - timestamp % period - enclosed * period);
-
-        Map<Long, HoppingWindow> windows = new HashMap<>();
-        while (windowStart <= timestamp) {
-            // add the window
-            HoppingWindow window = new HoppingWindow(windowStart, windowStart + this.size);
-            windows.put(windowStart, window);
-
-            // advance the step period
-            windowStart += this.period;
-        }
-
-        return windows;
-    }
-
-    @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(HoppingWindows.class))
-            return false;
-
-        HoppingWindows otherWindows = (HoppingWindows) other;
-
-        return this.size == otherWindows.size && this.period == otherWindows.period;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 24dbdd3..a74984a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -17,15 +17,14 @@
 
 package org.apache.kafka.streams.kstream;
 
-
-import org.apache.kafka.streams.kstream.internals.TumblingWindow;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import java.util.Map;
 
 /**
  * The window specifications used for joins.
  */
-public class JoinWindows extends Windows<TumblingWindow> {
+public class JoinWindows extends Windows<TimeWindow> {
 
     public final long before;
     public final long after;
@@ -74,19 +73,29 @@ public class JoinWindows extends Windows<TumblingWindow> {
     }
 
     @Override
-    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
+    public Map<Long, TimeWindow> windowsFor(long timestamp) {
         // this function should never be called
         throw new UnsupportedOperationException("windowsFor() is not supported in JoinWindows");
     }
 
     @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(JoinWindows.class))
+    public final boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof JoinWindows)) {
             return false;
+        }
 
-        JoinWindows otherWindows = (JoinWindows) other;
+        JoinWindows other = (JoinWindows) o;
+        return this.before == other.before && this.after == other.after;
+    }
 
-        return this.before == otherWindows.before && this.after == otherWindows.after;
+    @Override
+    public int hashCode() {
+        int result = (int) (before ^ (before >>> 32));
+        result = 31 * result + (int) (after ^ (after >>> 32));
+        return result;
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
new file mode 100644
index 0000000..fa3a9d8
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The time-based window specifications used for aggregations.
+ */
+public class TimeWindows extends Windows<TimeWindow> {
+
+    /**
+     * The size of the window, i.e. how long a window lasts.
+     * The window size's effective time unit is determined by the semantics of the topology's
+     * configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+     */
+    public final long size;
+
+    /**
+     * The size of the window's advance interval, i.e. by how much a window moves forward relative
+     * to the previous one.  The interval's effective time unit is determined by the semantics of
+     * the topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+     */
+    public final long advance;
+
+    private TimeWindows(String name, long size, long advance) {
+        super(name);
+        if (size <= 0) {
+            throw new IllegalArgumentException("window size must be > 0 (you provided " + size + ")");
+        }
+        this.size = size;
+        if (!(0 < advance && advance <= size)) {
+            throw new IllegalArgumentException(
+                String.format("advance interval (%d) must lie within interval (0, %d]", advance, size));
+        }
+        this.advance = advance;
+    }
+
+    /**
+     * Returns a window definition with the given window size, and with the advance interval being
+     * equal to the window size.  Think: [N * size, N * size + size), with N denoting the N-th
+     * window.
+     *
+     * This provides the semantics of tumbling windows, which are fixed-sized, gap-less,
+     * non-overlapping windows.  Tumbling windows are a specialization of hopping windows.
+     *
+     * @param name The name of the window.  Must not be null or empty.
+     * @param size The size of the window, with the requirement that size &gt; 0.
+     *             The window size's effective time unit is determined by the semantics of the
+     *             topology's configured {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+     * @return a new window definition
+     */
+    public static TimeWindows of(String name, long size) {
+        return new TimeWindows(name, size, size);
+    }
+
+    /**
+     * Returns a window definition with the original size, but advance ("hop") the window by the given
+     * interval, which specifies by how much a window moves forward relative to the previous one.
+     * Think: [N * advanceInterval, N * advanceInterval + size), with N denoting the N-th window.
+     *
+     * This provides the semantics of hopping windows, which are fixed-sized, overlapping windows.
+     *
+     * @param interval The advance interval ("hop") of the window, with the requirement that
+     *                 0 &lt; interval &le; size.  The interval's effective time unit is
+     *                 determined by the semantics of the topology's configured
+     *                 {@link org.apache.kafka.streams.processor.TimestampExtractor}.
+     * @return a new window definition
+     */
+    public TimeWindows advanceBy(long interval) {
+        return new TimeWindows(this.name, this.size, interval);
+    }
+
+    @Override
+    public Map<Long, TimeWindow> windowsFor(long timestamp) {
+        long enclosed = (size - 1) / advance;
+        long windowStart = Math.max(0, timestamp - timestamp % advance - enclosed * advance);
+
+        Map<Long, TimeWindow> windows = new HashMap<>();
+        while (windowStart <= timestamp) {
+            TimeWindow window = new TimeWindow(windowStart, windowStart + this.size);
+            windows.put(windowStart, window);
+            windowStart += this.advance;
+        }
+        return windows;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof TimeWindows)) {
+            return false;
+        }
+        TimeWindows other = (TimeWindows) o;
+        return this.size == other.size && this.advance == other.advance;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (size ^ (size >>> 32));
+        result = 31 * result + (int) (advance ^ (advance >>> 32));
+        return result;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
deleted file mode 100644
index cadedba..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.kstream.internals.TumblingWindow;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The tumbling window specifications used for aggregations.
- */
-public class TumblingWindows extends Windows<TumblingWindow> {
-
-    private static final long DEFAULT_SIZE_MS = 1000L;
-
-    public final long size;
-
-    private TumblingWindows(String name, long size) {
-        super(name);
-
-        this.size = size;
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the default window size
-     */
-    public static TumblingWindows of(String name) {
-        return new TumblingWindows(name, DEFAULT_SIZE_MS);
-    }
-
-    /**
-     * Returns a half-interval sliding window definition with the window size in milliseconds
-     */
-    public TumblingWindows with(long size) {
-        return new TumblingWindows(this.name, size);
-    }
-
-    @Override
-    public Map<Long, TumblingWindow> windowsFor(long timestamp) {
-        long windowStart = timestamp - timestamp % size;
-
-        // we cannot use Collections.singleMap since it does not support remove() call
-        Map<Long, TumblingWindow> windows = new HashMap<>();
-        windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size));
-
-        return windows;
-    }
-
-    @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(TumblingWindows.class))
-            return false;
-
-        TumblingWindows otherWindows = (TumblingWindows) other;
-
-        return this.size == otherWindows.size;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
index 7cadfb4..bea3b57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java
@@ -34,6 +34,9 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     private UnlimitedWindows(String name, long start) {
         super(name);
 
+        if (start < 0) {
+            throw new IllegalArgumentException("start must be > 0 (you provided " + start + ")");
+        }
         this.start = start;
     }
 
@@ -52,21 +55,31 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> {
     public Map<Long, UnlimitedWindow> windowsFor(long timestamp) {
         // always return the single unlimited window
 
-        // we cannot use Collections.singleMap since it does not support remove() call
+        // we cannot use Collections.singleMap since it does not support remove()
         Map<Long, UnlimitedWindow> windows = new HashMap<>();
-        windows.put(start, new UnlimitedWindow(start));
-
-
+        if (timestamp >= start) {
+            windows.put(start, new UnlimitedWindow(start));
+        }
         return windows;
     }
 
     @Override
-    public boolean equalTo(Windows other) {
-        if (!other.getClass().equals(UnlimitedWindows.class))
+    public final boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+
+        if (!(o instanceof UnlimitedWindows)) {
             return false;
+        }
 
-        UnlimitedWindows otherWindows = (UnlimitedWindows) other;
+        UnlimitedWindows other = (UnlimitedWindows) o;
+        return this.start == other.start;
+    }
 
-        return this.start == otherWindows.start;
+    @Override
+    public int hashCode() {
+        return (int) (start ^ (start >>> 32));
     }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
index f2965dc..784d5c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java
@@ -48,21 +48,18 @@ public abstract class Window {
         return this.start() < other.end() || other.start() < this.end();
     }
 
-    public boolean equalsTo(Window other) {
-        return this.start() == other.start() && this.end() == other.end();
-    }
-
     @Override
     public boolean equals(Object obj) {
-        if (obj == this)
+        if (obj == this) {
             return true;
+        }
 
-        if (!(obj instanceof Window))
+        if (getClass() != obj.getClass()) {
             return false;
+        }
 
         Window other = (Window) obj;
-
-        return this.equalsTo(other) && this.start == other.start && this.end == other.end;
+        return this.start == other.start && this.end == other.end;
     }
 
     @Override
@@ -70,4 +67,5 @@ public abstract class Window {
         long n = (this.start << 32) | this.end;
         return (int) (n % 0xFFFFFFFFL);
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index e7dc23e..1406de6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -45,6 +45,9 @@ public abstract class Windows<W extends Window> {
     public int segments;
 
     protected Windows(String name) {
+        if (name == null || name.isEmpty()) {
+            throw new IllegalArgumentException("name must not be null or empty");
+        }
         this.name = name;
         this.segments = DEFAULT_NUM_SEGMENTS;
         this.emitDurationMs = DEFAULT_EMIT_DURATION;
@@ -95,7 +98,6 @@ public abstract class Windows<W extends Window> {
         return prefix + String.format("%010d", NAME_INDEX.getAndIncrement());
     }
 
-    public abstract boolean equalTo(Windows other);
-
     public abstract Map<Long, W> windowsFor(long timestamp);
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
deleted file mode 100644
index 8b0b2fb..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/HoppingWindow.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class HoppingWindow extends Window {
-
-    public HoppingWindow(long start, long end) {
-        super(start, end);
-    }
-
-    @Override
-    public boolean overlap(Window other) {
-        return super.overlap(other) && other.getClass().equals(HoppingWindow.class);
-    }
-
-    @Override
-    public boolean equalsTo(Window other) {
-        return super.equalsTo(other) && other.getClass().equals(HoppingWindow.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
new file mode 100644
index 0000000..5dfb9eb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Window;
+
+public class TimeWindow extends Window {
+
+    public TimeWindow(long start, long end) {
+        super(start, end);
+    }
+
+    @Override
+    public boolean overlap(Window other) {
+        return getClass() == other.getClass() && super.overlap(other);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
deleted file mode 100644
index a02d4b9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TumblingWindow.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.Window;
-
-public class TumblingWindow extends Window {
-
-    public TumblingWindow(long start, long end) {
-        super(start, end);
-    }
-
-    @Override
-    public boolean overlap(Window other) {
-        return super.overlap(other) && other.getClass().equals(TumblingWindow.class);
-    }
-
-    @Override
-    public boolean equalsTo(Window other) {
-        return super.equalsTo(other) && other.getClass().equals(TumblingWindow.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
index 8ac8f70..4b93f9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/UnlimitedWindow.java
@@ -27,11 +27,7 @@ public class UnlimitedWindow extends Window {
 
     @Override
     public boolean overlap(Window other) {
-        return super.overlap(other) && other.getClass().equals(UnlimitedWindow.class);
+        return getClass() == other.getClass() && super.overlap(other);
     }
 
-    @Override
-    public boolean equalsTo(Window other) {
-        return super.equalsTo(other) && other.getClass().equals(UnlimitedWindow.class);
-    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
new file mode 100644
index 0000000..e9ff235
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TimeWindowsTest {
+
+    private static String anyName = "window";
+    private static long anySize = 123L;
+
+    @Test
+    public void shouldHaveSaneEqualsAndHashCode() {
+        TimeWindows w1 = TimeWindows.of("w1", anySize);
+        TimeWindows w2 = TimeWindows.of("w2", w1.size);
+
+        // Reflexive
+        assertTrue(w1.equals(w1));
+        assertTrue(w1.hashCode() == w1.hashCode());
+
+        // Symmetric
+        assertTrue(w1.equals(w2));
+        assertTrue(w1.hashCode() == w2.hashCode());
+        assertTrue(w2.hashCode() == w1.hashCode());
+
+        // Transitive
+        TimeWindows w3 = TimeWindows.of("w3", w2.size);
+        assertTrue(w2.equals(w3));
+        assertTrue(w2.hashCode() == w3.hashCode());
+        assertTrue(w1.equals(w3));
+        assertTrue(w1.hashCode() == w3.hashCode());
+
+        // Inequality scenarios
+        assertFalse("must be false for null", w1.equals(null));
+        assertFalse("must be false for different window types", w1.equals(UnlimitedWindows.of("irrelevant")));
+        assertFalse("must be false for different types", w1.equals(new Object()));
+
+        TimeWindows differentWindowSize = TimeWindows.of("differentWindowSize", w1.size + 1);
+        assertFalse("must be false when window sizes are different", w1.equals(differentWindowSize));
+
+        TimeWindows differentAdvanceInterval = w1.advanceBy(w1.advance - 1);
+        assertFalse("must be false when advance intervals are different", w1.equals(differentAdvanceInterval));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeEmpty() {
+        TimeWindows.of("", anySize);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeNull() {
+        TimeWindows.of(null, anySize);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeNegative() {
+        TimeWindows.of(anyName, -1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void windowSizeMustNotBeZero() {
+        TimeWindows.of(anyName, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void advanceIntervalMustNotBeNegative() {
+        TimeWindows.of(anyName, anySize).advanceBy(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void advanceIntervalMustNotBeZero() {
+        TimeWindows.of(anyName, anySize).advanceBy(0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void advanceIntervalMustNotBeLargerThanWindowSize() {
+        long size = anySize;
+        TimeWindows.of(anyName, size).advanceBy(size + 1);
+    }
+
+    @Test
+    public void windowsForHoppingWindows() {
+        TimeWindows windows = TimeWindows.of(anyName, 12L).advanceBy(5L);
+        Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+        assertEquals(12L / 5L + 1, matched.size());
+        assertEquals(new TimeWindow(10L, 22L), matched.get(10L));
+        assertEquals(new TimeWindow(15L, 27L), matched.get(15L));
+        assertEquals(new TimeWindow(20L, 32L), matched.get(20L));
+    }
+
+    @Test
+    public void windowsForTumblingWindows() {
+        TimeWindows windows = TimeWindows.of(anyName, 12L);
+        Map<Long, TimeWindow> matched = windows.windowsFor(21L);
+        assertEquals(1, matched.size());
+        assertEquals(new TimeWindow(12L, 24L), matched.get(12L));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
new file mode 100644
index 0000000..da5f159
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/UnlimitedWindowsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class UnlimitedWindowsTest {
+
+    private static String anyName = "window";
+    private static long anyStartTime = 10L;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeEmpty() {
+        UnlimitedWindows.of("");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void nameMustNotBeNull() {
+        UnlimitedWindows.of(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void startTimeMustNotBeNegative() {
+        UnlimitedWindows.of(anyName).startOn(-1);
+    }
+
+    @Test
+    public void startTimeCanBeZero() {
+        UnlimitedWindows.of(anyName).startOn(0);
+    }
+
+    @Test
+    public void shouldIncludeRecordsThatHappenedOnWindowStart() {
+        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(w.start);
+        assertEquals(1, matchedWindows.size());
+        assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
+    }
+
+    @Test
+    public void shouldIncludeRecordsThatHappenedAfterWindowStart() {
+        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        long timestamp = w.start + 1;
+        Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
+        assertEquals(1, matchedWindows.size());
+        assertEquals(new UnlimitedWindow(anyStartTime), matchedWindows.get(anyStartTime));
+    }
+
+    @Test
+    public void shouldExcludeRecordsThatHappenedBeforeWindowStart() {
+        UnlimitedWindows w = UnlimitedWindows.of(anyName).startOn(anyStartTime);
+        long timestamp = w.start - 1;
+        Map<Long, UnlimitedWindow> matchedWindows = w.windowsFor(timestamp);
+        assertTrue(matchedWindows.isEmpty());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 828103a..f4fe3a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -20,10 +20,10 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -37,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 
 import static org.junit.Assert.assertEquals;
 
@@ -62,215 +63,229 @@ public class KStreamWindowAggregateTest {
 
     @Test
     public void testAggBasic() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        String topic1 = "topic1";
-
-        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-        KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                strSerde,
-                strSerde);
-
-        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
-
-        driver = new KStreamTestDriver(builder, stateDir);
-
-        driver.setTime(0L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(1L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(2L);
-        driver.process(topic1, "C", "3");
-        driver.setTime(3L);
-        driver.process(topic1, "D", "4");
-        driver.setTime(4L);
-        driver.process(topic1, "A", "1");
-
-        driver.setTime(5L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(6L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(7L);
-        driver.process(topic1, "D", "4");
-        driver.setTime(8L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(9L);
-        driver.process(topic1, "C", "3");
-
-        driver.setTime(10L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(11L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(12L);
-        driver.process(topic1, "D", "4");
-        driver.setTime(13L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(14L);
-        driver.process(topic1, "C", "3");
-
-        assertEquals(Utils.mkList(
-                "[A@0]:0+1",
-                "[B@0]:0+2",
-                "[C@0]:0+3",
-                "[D@0]:0+4",
-                "[A@0]:0+1+1",
-
-                "[A@0]:0+1+1+1", "[A@5]:0+1",
-                "[B@0]:0+2+2", "[B@5]:0+2",
-                "[D@0]:0+4+4", "[D@5]:0+4",
-                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                "[C@0]:0+3+3", "[C@5]:0+3",
-
-                "[A@5]:0+1+1", "[A@10]:0+1",
-                "[B@5]:0+2+2+2", "[B@10]:0+2",
-                "[D@5]:0+4+4", "[D@10]:0+4",
-                "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
-                "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+
+            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+            KTable<Windowed<String>, String> table2 =
+                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                    strSerde,
+                    strSerde);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            driver.setTime(10L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(11L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(12L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(13L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(14L);
+            driver.process(topic1, "C", "3");
+
+            assertEquals(Utils.mkList(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1",
+
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3",
+
+                    "[A@5]:0+1+1", "[A@10]:0+1",
+                    "[B@5]:0+2+2+2", "[B@10]:0+2",
+                    "[D@5]:0+4+4", "[D@10]:0+4",
+                    "[B@5]:0+2+2+2+2", "[B@10]:0+2+2",
+                    "[C@5]:0+3+3", "[C@10]:0+3"), proc2.processed);
+
+        } finally {
+            Utils.delete(baseDir);
+        }
     }
 
     @Test
     public void testJoin() throws Exception {
-        final KStreamBuilder builder = new KStreamBuilder();
-
-        String topic1 = "topic1";
-        String topic2 = "topic2";
-
-        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
-        KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                strSerde,
-                strSerde);
-
-        MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
-        table1.toStream().process(proc1);
-
-        KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
-        KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
-                strSerde,
-                strSerde);
-
-        MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
-        table2.toStream().process(proc2);
-
-
-        MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
-        table1.join(table2, new ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(String p1, String p2) {
-                return p1 + "%" + p2;
-            }
-        }).toStream().process(proc3);
-
-        driver = new KStreamTestDriver(builder, stateDir);
-
-        driver.setTime(0L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(1L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(2L);
-        driver.process(topic1, "C", "3");
-        driver.setTime(3L);
-        driver.process(topic1, "D", "4");
-        driver.setTime(4L);
-        driver.process(topic1, "A", "1");
-
-        proc1.checkAndClearProcessResult(
-                "[A@0]:0+1",
-                "[B@0]:0+2",
-                "[C@0]:0+3",
-                "[D@0]:0+4",
-                "[A@0]:0+1+1"
-        );
-        proc2.checkAndClearProcessResult();
-        proc3.checkAndClearProcessResult(
-                "[A@0]:null",
-                "[B@0]:null",
-                "[C@0]:null",
-                "[D@0]:null",
-                "[A@0]:null"
-        );
-
-        driver.setTime(5L);
-        driver.process(topic1, "A", "1");
-        driver.setTime(6L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(7L);
-        driver.process(topic1, "D", "4");
-        driver.setTime(8L);
-        driver.process(topic1, "B", "2");
-        driver.setTime(9L);
-        driver.process(topic1, "C", "3");
-
-        proc1.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1", "[A@5]:0+1",
-                "[B@0]:0+2+2", "[B@5]:0+2",
-                "[D@0]:0+4+4", "[D@5]:0+4",
-                "[B@0]:0+2+2+2", "[B@5]:0+2+2",
-                "[C@0]:0+3+3", "[C@5]:0+3"
-        );
-        proc2.checkAndClearProcessResult();
-        proc3.checkAndClearProcessResult(
-                "[A@0]:null", "[A@5]:null",
-                "[B@0]:null", "[B@5]:null",
-                "[D@0]:null", "[D@5]:null",
-                "[B@0]:null", "[B@5]:null",
-                "[C@0]:null", "[C@5]:null"
-        );
-
-        driver.setTime(0L);
-        driver.process(topic2, "A", "a");
-        driver.setTime(1L);
-        driver.process(topic2, "B", "b");
-        driver.setTime(2L);
-        driver.process(topic2, "C", "c");
-        driver.setTime(3L);
-        driver.process(topic2, "D", "d");
-        driver.setTime(4L);
-        driver.process(topic2, "A", "a");
-
-        proc1.checkAndClearProcessResult();
-        proc2.checkAndClearProcessResult(
-                "[A@0]:0+a",
-                "[B@0]:0+b",
-                "[C@0]:0+c",
-                "[D@0]:0+d",
-                "[A@0]:0+a+a"
-        );
-        proc3.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1%0+a",
-                "[B@0]:0+2+2+2%0+b",
-                "[C@0]:0+3+3%0+c",
-                "[D@0]:0+4+4%0+d",
-                "[A@0]:0+1+1+1%0+a+a");
-
-        driver.setTime(5L);
-        driver.process(topic2, "A", "a");
-        driver.setTime(6L);
-        driver.process(topic2, "B", "b");
-        driver.setTime(7L);
-        driver.process(topic2, "D", "d");
-        driver.setTime(8L);
-        driver.process(topic2, "B", "b");
-        driver.setTime(9L);
-        driver.process(topic2, "C", "c");
-
-        proc1.checkAndClearProcessResult();
-        proc2.checkAndClearProcessResult(
-                "[A@0]:0+a+a+a", "[A@5]:0+a",
-                "[B@0]:0+b+b", "[B@5]:0+b",
-                "[D@0]:0+d+d", "[D@5]:0+d",
-                "[B@0]:0+b+b+b", "[B@5]:0+b+b",
-                "[C@0]:0+c+c", "[C@5]:0+c"
-        );
-        proc3.checkAndClearProcessResult(
-                "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
-                "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
-                "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
-                "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
-                "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
-        );
+        final File baseDir = Files.createTempDirectory("test").toFile();
+
+        try {
+            final KStreamBuilder builder = new KStreamBuilder();
+            String topic1 = "topic1";
+            String topic2 = "topic2";
+
+            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+            KTable<Windowed<String>, String> table1 =
+                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                    strSerde,
+                    strSerde);
+
+            MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
+            table1.toStream().process(proc1);
+
+            KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+            KTable<Windowed<String>, String> table2 =
+                stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
+                    TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
+                    strSerde,
+                    strSerde);
+
+            MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
+            table2.toStream().process(proc2);
+
+
+            MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>();
+            table1.join(table2, new ValueJoiner<String, String, String>() {
+                @Override
+                public String apply(String p1, String p2) {
+                    return p1 + "%" + p2;
+                }
+            }).toStream().process(proc3);
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+
+            driver.setTime(0L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(1L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(2L);
+            driver.process(topic1, "C", "3");
+            driver.setTime(3L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(4L);
+            driver.process(topic1, "A", "1");
+
+            proc1.checkAndClearProcessResult(
+                    "[A@0]:0+1",
+                    "[B@0]:0+2",
+                    "[C@0]:0+3",
+                    "[D@0]:0+4",
+                    "[A@0]:0+1+1"
+            );
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
+                    "[A@0]:null",
+                    "[B@0]:null",
+                    "[C@0]:null",
+                    "[D@0]:null",
+                    "[A@0]:null"
+            );
+
+            driver.setTime(5L);
+            driver.process(topic1, "A", "1");
+            driver.setTime(6L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(7L);
+            driver.process(topic1, "D", "4");
+            driver.setTime(8L);
+            driver.process(topic1, "B", "2");
+            driver.setTime(9L);
+            driver.process(topic1, "C", "3");
+
+            proc1.checkAndClearProcessResult(
+                    "[A@0]:0+1+1+1", "[A@5]:0+1",
+                    "[B@0]:0+2+2", "[B@5]:0+2",
+                    "[D@0]:0+4+4", "[D@5]:0+4",
+                    "[B@0]:0+2+2+2", "[B@5]:0+2+2",
+                    "[C@0]:0+3+3", "[C@5]:0+3"
+            );
+            proc2.checkAndClearProcessResult();
+            proc3.checkAndClearProcessResult(
+                    "[A@0]:null", "[A@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[D@0]:null", "[D@5]:null",
+                    "[B@0]:null", "[B@5]:null",
+                    "[C@0]:null", "[C@5]:null"
+            );
+
+            driver.setTime(0L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(1L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(2L);
+            driver.process(topic2, "C", "c");
+            driver.setTime(3L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(4L);
+            driver.process(topic2, "A", "a");
+
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
+                    "[A@0]:0+a",
+                    "[B@0]:0+b",
+                    "[C@0]:0+c",
+                    "[D@0]:0+d",
+                    "[A@0]:0+a+a"
+            );
+            proc3.checkAndClearProcessResult(
+                    "[A@0]:0+1+1+1%0+a",
+                    "[B@0]:0+2+2+2%0+b",
+                    "[C@0]:0+3+3%0+c",
+                    "[D@0]:0+4+4%0+d",
+                    "[A@0]:0+1+1+1%0+a+a");
+
+            driver.setTime(5L);
+            driver.process(topic2, "A", "a");
+            driver.setTime(6L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(7L);
+            driver.process(topic2, "D", "d");
+            driver.setTime(8L);
+            driver.process(topic2, "B", "b");
+            driver.setTime(9L);
+            driver.process(topic2, "C", "c");
+
+            proc1.checkAndClearProcessResult();
+            proc2.checkAndClearProcessResult(
+                    "[A@0]:0+a+a+a", "[A@5]:0+a",
+                    "[B@0]:0+b+b", "[B@5]:0+b",
+                    "[D@0]:0+d+d", "[D@5]:0+d",
+                    "[B@0]:0+b+b+b", "[B@5]:0+b+b",
+                    "[C@0]:0+c+c", "[C@5]:0+c"
+            );
+            proc3.checkAndClearProcessResult(
+                    "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a",
+                    "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b",
+                    "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d",
+                    "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b",
+                    "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c"
+            );
+        } finally {
+            Utils.delete(baseDir);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 7c6d5ec..b31b20d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -49,7 +49,7 @@ public class WindowedStreamPartitionerTest {
             new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0])
     );
 
-    private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet());
 
     @Test
     public void testCopartitioning() {
@@ -71,7 +71,7 @@ public class WindowedStreamPartitionerTest {
             Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
 
             for (int w = 0; w < 10; w++) {
-                HoppingWindow window = new HoppingWindow(10 * w, 20 * w);
+                TimeWindow window = new TimeWindow(10 * w, 20 * w);
 
                 Windowed<Integer> windowedKey = new Windowed<>(key, window);
                 Integer actual = streamPartitioner.partition(windowedKey, value, infos.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
deleted file mode 100644
index f9b6ba5..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowsTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.TumblingWindows;
-import org.apache.kafka.streams.kstream.UnlimitedWindows;
-import org.junit.Test;
-
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class WindowsTest {
-
-    @Test
-    public void hoppingWindows() {
-
-        HoppingWindows windows = HoppingWindows.of("test").with(12L).every(5L);
-
-        Map<Long, HoppingWindow> matched = windows.windowsFor(21L);
-
-        assertEquals(3, matched.size());
-
-        assertEquals(new HoppingWindow(10L, 22L), matched.get(10L));
-        assertEquals(new HoppingWindow(15L, 27L), matched.get(15L));
-        assertEquals(new HoppingWindow(20L, 32L), matched.get(20L));
-    }
-
-    @Test
-    public void tumblineWindows() {
-
-        TumblingWindows windows = TumblingWindows.of("test").with(12L);
-
-        Map<Long, TumblingWindow> matched = windows.windowsFor(21L);
-
-        assertEquals(1, matched.size());
-
-        assertEquals(new TumblingWindow(12L, 24L), matched.get(12L));
-    }
-
-    @Test
-    public void unlimitedWindows() {
-
-        UnlimitedWindows windows = UnlimitedWindows.of("test").startOn(10L);
-
-        Map<Long, UnlimitedWindow> matched = windows.windowsFor(21L);
-
-        assertEquals(1, matched.size());
-
-        assertEquals(new UnlimitedWindow(10L), matched.get(10L));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68433dcf/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 95e0fbf..733c1ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.TumblingWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.UnlimitedWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -207,7 +207,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // windowed count
         data.countByKey(
-                TumblingWindows.of("tumbling-win-cnt").with(WINDOW_SIZE),
+                TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE),
                 stringSerde
         ).toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {