You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:45 UTC

[11/13] flink git commit: [FLINK-2550] Change Window API constructs to use Time instead of long

[FLINK-2550] Change Window API constructs to use Time instead of long

This covers assigners/triggers/evictors.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c2c7694
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c2c7694
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c2c7694

Branch: refs/heads/master
Commit: 8c2c76947a9a26e25d4539068b5253b265c71c23
Parents: 833b347
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Oct 3 16:39:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    | 10 +++---
 .../streaming/api/datastream/KeyedStream.java   |  8 ++---
 .../assigners/SlidingProcessingTimeWindows.java |  5 +--
 .../windowing/assigners/SlidingTimeWindows.java |  5 +--
 .../TumblingProcessingTimeWindows.java          |  5 +--
 .../assigners/TumblingTimeWindows.java          |  5 +--
 .../api/windowing/evictors/TimeEvictor.java     |  5 +--
 .../api/windowing/time/AbstractTime.java        |  3 ++
 .../ContinuousProcessingTimeTrigger.java        | 31 +++++++++---------
 .../triggers/ContinuousWatermarkTrigger.java    | 25 ++++++++-------
 .../windowing/AllWindowTranslationTest.java     | 17 +++++-----
 .../windowing/NonKeyedWindowOperatorTest.java   | 25 ++++++---------
 .../operators/windowing/WindowOperatorTest.java | 16 +++++-----
 .../windowing/WindowTranslationTest.java        | 18 ++++++-----
 .../flink/streaming/api/scala/DataStream.scala  | 12 +++----
 .../flink/streaming/api/scala/KeyedStream.scala | 12 +++----
 .../api/scala/AllWindowTranslationTest.scala    | 33 +++++++++++++-------
 .../api/scala/WindowTranslationTest.scala       | 24 +++++++++-----
 18 files changed, 144 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6d88416..32d9012 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -741,9 +741,9 @@ public class DataStream<T> {
 		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
 		if (actualSize instanceof EventTime) {
-			return windowAll(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+			return windowAll(TumblingTimeWindows.of(actualSize));
 		} else {
-			return windowAll(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+			return windowAll(TumblingProcessingTimeWindows.of(actualSize));
 		}
 	}
 
@@ -763,11 +763,9 @@ public class DataStream<T> {
 		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
 		if (actualSize instanceof EventTime) {
-			return windowAll(SlidingTimeWindows.of(actualSize.toMilliseconds(),
-					actualSlide.toMilliseconds()));
+			return windowAll(SlidingTimeWindows.of(size, slide));
 		} else {
-			return windowAll(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(),
-					actualSlide.toMilliseconds()));
+			return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 265886b..f7c5b53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -125,9 +125,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
 		if (actualSize instanceof EventTime) {
-			return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+			return window(TumblingTimeWindows.of(actualSize));
 		} else {
-			return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+			return window(TumblingProcessingTimeWindows.of(actualSize));
 		}
 	}
 
@@ -147,9 +147,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
 
 		if (actualSize instanceof EventTime) {
-			return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+			return window(SlidingTimeWindows.of(actualSize, actualSlide));
 		} else {
-			return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+			return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index a2d95c2..6fc79b0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.assigners;
 
 import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -86,7 +87,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
 	 * @param slide The slide interval of the generated windows.
 	 * @return The time policy.
 	 */
-	public static SlidingProcessingTimeWindows of(long size, long slide) {
-		return new SlidingProcessingTimeWindows(size, slide);
+	public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) {
+		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index cb5a7a1..49bff05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -75,7 +76,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	 * @param slide The slide interval of the generated windows.
 	 * @return The time policy.
 	 */
-	public static SlidingTimeWindows of(long size, long slide) {
-		return new SlidingTimeWindows(size, slide);
+	public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
+		return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index b1ef857..1f2eebf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -61,7 +62,7 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
 	 * @param size The size of the generated windows.
 	 * @return The time policy.
 	 */
-	public static TumblingProcessingTimeWindows of(long size) {
-		return new TumblingProcessingTimeWindows(size);
+	public static TumblingProcessingTimeWindows of(AbstractTime size) {
+		return new TumblingProcessingTimeWindows(size.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index d19c97d..019f45b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -60,8 +61,8 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	 * @param size The size of the generated windows.
 	 * @return The time policy.
 	 */
-	public static TumblingTimeWindows of(long size) {
-		return new TumblingTimeWindows(size);
+	public static TumblingTimeWindows of(AbstractTime size) {
+		return new TumblingTimeWindows(size.toMilliseconds());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index c38100c..2965214 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.windowing.evictors;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -54,7 +55,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 		return windowSize;
 	}
 
-	public static <W extends Window> TimeEvictor<W> of(long windowSize) {
-		return new TimeEvictor<>(windowSize);
+	public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
+		return new TimeEvictor<>(windowSize.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
index 1264c2a..3f8fb60 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
@@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+/**
+ * Base class for {@link Time} implementations.
+ */
 public abstract class AbstractTime {
 
 	/** The time unit for this policy's time interval */

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index da198be..24e8ce3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -18,32 +18,33 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long granularity;
+	private long interval;
 
 	private long nextFireTimestamp = 0;
 
-	private ContinuousProcessingTimeTrigger(long granularity) {
-		this.granularity = granularity;
+	private ContinuousProcessingTimeTrigger(long interval) {
+		this.interval = interval;
 	}
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
 		long currentTime = System.currentTimeMillis();
 		if (nextFireTimestamp == 0) {
-			long start = currentTime - (currentTime % granularity);
-			nextFireTimestamp = start + granularity;
+			long start = currentTime - (currentTime % interval);
+			nextFireTimestamp = start + interval;
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 			return TriggerResult.CONTINUE;
 		}
 		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % granularity);
-			nextFireTimestamp = start + granularity;
+			long start = currentTime - (currentTime % interval);
+			nextFireTimestamp = start + interval;
 
 			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 
@@ -57,8 +58,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 		// only fire if an element didn't already fire
 		long currentTime = System.currentTimeMillis();
 		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % granularity);
-			nextFireTimestamp = start + granularity;
+			long start = currentTime - (currentTime % interval);
+			nextFireTimestamp = start + interval;
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -66,20 +67,20 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 
 	@Override
 	public Trigger<Object, W> duplicate() {
-		return new ContinuousProcessingTimeTrigger<>(granularity);
+		return new ContinuousProcessingTimeTrigger<>(interval);
 	}
 
 	@VisibleForTesting
-	public long getGranularity() {
-		return granularity;
+	public long getInterval() {
+		return interval;
 	}
 
 	@Override
 	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}
 
-	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) {
-		return new ContinuousProcessingTimeTrigger<>(granularity);
+	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
+		return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index 3b6dc6d..e11ceba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -18,24 +18,25 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
-	private long granularity;
+	private long interval;
 
 	private boolean first = true;
 
-	private ContinuousWatermarkTrigger(long granularity) {
-		this.granularity = granularity;
+	private ContinuousWatermarkTrigger(long interval) {
+		this.interval = interval;
 	}
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
 		if (first) {
-			long start = timestamp - (timestamp % granularity);
-			long nextFireTimestamp = start + granularity;
+			long start = timestamp - (timestamp % interval);
+			long nextFireTimestamp = start + interval;
 
 			ctx.registerWatermarkTimer(nextFireTimestamp);
 			first = false;
@@ -46,26 +47,26 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
 
 	@Override
 	public TriggerResult onTime(long time, TriggerContext ctx) {
-		ctx.registerWatermarkTimer(time + granularity);
+		ctx.registerWatermarkTimer(time + interval);
 		return TriggerResult.FIRE;
 	}
 
 	@Override
 	public Trigger<Object, W> duplicate() {
-		return new ContinuousWatermarkTrigger<>(granularity);
+		return new ContinuousWatermarkTrigger<>(interval);
 	}
 
 	@Override
 	public String toString() {
-		return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}
 
 	@VisibleForTesting
-	public long getGranularity() {
-		return granularity;
+	public long getInterval() {
+		return interval;
 	}
 
-	public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
-		return new ContinuousWatermarkTrigger<>(granularity);
+	public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
+		return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index ee8c6d6..767b40c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -39,6 +40,8 @@ import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * These tests verify that the api calls on
  * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
@@ -62,7 +65,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduceWindow(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -70,7 +73,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -98,7 +101,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduceWindow(reducer);
 
@@ -111,7 +114,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -144,7 +147,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduceWindow(reducer);
 
@@ -158,9 +161,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(1000))
+				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(100))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 9b0bcc4..6cc8931 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -49,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(Parameterized.class)
@@ -69,11 +71,11 @@ public class NonKeyedWindowOperatorTest {
 	public void testSlidingEventTimeWindows() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
-		final int WINDOW_SLIDE = 1000;
+		final int WINDOW_SIZE = 3;
+		final int WINDOW_SLIDE = 1;
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
+				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -150,10 +152,10 @@ public class NonKeyedWindowOperatorTest {
 	public void testTumblingEventTimeWindows() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
+		final int WINDOW_SIZE = 3;
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
-				TumblingTimeWindows.of(WINDOW_SIZE),
+				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
 				WatermarkTrigger.create());
@@ -228,13 +230,13 @@ public class NonKeyedWindowOperatorTest {
 	public void testContinuousWatermarkTrigger() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
+		final int WINDOW_SIZE = 3;
 
 		NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
 				GlobalWindows.create(),
 				windowBufferFactory,
 				new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 
@@ -421,13 +423,4 @@ public class NonKeyedWindowOperatorTest {
 			}
 		}
 	}
-
-	private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 1bfd1d5..d387df0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
@@ -49,6 +50,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 @RunWith(Parameterized.class)
@@ -69,11 +71,11 @@ public class WindowOperatorTest {
 	public void testSlidingEventTimeWindows() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
-		final int WINDOW_SLIDE = 1000;
+		final int WINDOW_SIZE = 3;
+		final int WINDOW_SLIDE = 1;
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
+				SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
 				new TupleKeySelector(),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
@@ -157,10 +159,10 @@ public class WindowOperatorTest {
 	public void testTumblingEventTimeWindows() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
+		final int WINDOW_SIZE = 3;
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
-				TumblingTimeWindows.of(WINDOW_SIZE),
+				TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
 				new TupleKeySelector(),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
@@ -240,14 +242,14 @@ public class WindowOperatorTest {
 	public void testContinuousWatermarkTrigger() throws Exception {
 		closeCalled.set(0);
 
-		final int WINDOW_SIZE = 3000;
+		final int WINDOW_SIZE = 3;
 
 		WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
 				GlobalWindows.create(),
 				new TupleKeySelector(),
 				windowBufferFactory,
 				new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
-				ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+				ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
 
 		operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index a3e6085..9dc6687 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -40,6 +41,8 @@ import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * These tests verify that the api calls on
  * {@link WindowedStream} instantiate
@@ -61,7 +64,8 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
+						Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduceWindow(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -70,7 +74,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -99,7 +103,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduceWindow(reducer);
 
@@ -113,7 +117,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingProcessingTimeWindows.of(1000))
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -147,7 +151,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(1000, 100))
+				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduceWindow(reducer);
 
@@ -162,9 +166,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingProcessingTimeWindows.of(1000))
+				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(100))
+				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7dfaeef..6ad7629 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -629,11 +629,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
     actualSize match {
       case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+        val assigner = TumblingTimeWindows.of(actualSize)
           .asInstanceOf[WindowAssigner[T, TimeWindow]]
         windowAll(assigner)
       case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+        val assigner = TumblingProcessingTimeWindows.of(actualSize)
           .asInstanceOf[WindowAssigner[T, TimeWindow]]
         windowAll(assigner)
       case _ => throw new RuntimeException("Invalid time: " + actualSize)
@@ -658,13 +658,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
     actualSize match {
       case t: EventTime =>
         val assigner = SlidingTimeWindows.of(
-          actualSize.toMilliseconds,
-          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+          actualSize,
+          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
         windowAll(assigner)
       case t: ProcessingTime =>
         val assigner = SlidingProcessingTimeWindows.of(
-          actualSize.toMilliseconds,
-          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+          actualSize,
+          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
         windowAll(assigner)
       case _ => throw new RuntimeException("Invalid time: " + actualSize)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 232e4bb..18b71be 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -55,11 +55,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
 
     actualSize match {
       case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+        val assigner = TumblingTimeWindows.of(actualSize)
           .asInstanceOf[WindowAssigner[T, TimeWindow]]
         window(assigner)
       case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+        val assigner = TumblingProcessingTimeWindows.of(actualSize)
           .asInstanceOf[WindowAssigner[T, TimeWindow]]
         window(assigner)
       case _ => throw new RuntimeException("Invalid time: " + actualSize)
@@ -85,13 +85,13 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     actualSize match {
       case t: EventTime =>
         val assigner = SlidingTimeWindows.of(
-          actualSize.toMilliseconds,
-          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+          actualSize,
+          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
         window(assigner)
       case t: ProcessingTime =>
         val assigner = SlidingProcessingTimeWindows.of(
-          actualSize.toMilliseconds,
-          actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+          actualSize,
+          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
         window(assigner)
       case _ => throw new RuntimeException("Invalid time: " + actualSize)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 35c7fcc..247256f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -19,13 +19,15 @@
 package org.apache.flink.streaming.api.scala
 
 
+import java.util.concurrent.TimeUnit
+
 import org.apache.flink.api.common.functions.RichReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, WatermarkTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
 import org.apache.flink.streaming.runtime.operators.windowing._
@@ -53,7 +55,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+      .windowAll(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
       .reduceWindow(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
@@ -65,10 +69,11 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(1000, 100))
-      .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+      .windowAll(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
-                    tuple: Tuple,
                     window: TimeWindow,
                     values: java.lang.Iterable[(String, Int)],
                     out: Collector[(String, Int)]) { }
@@ -91,7 +96,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+      .windowAll(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
       .reduceWindow(reducer)
 
@@ -109,7 +116,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(1000))
+      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
@@ -139,8 +146,10 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
-      .evictor(TimeEvictor.of(1000))
+      .windowAll(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
       .reduceWindow(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
@@ -157,7 +166,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(1000))
+      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 49d0a1a..f1b05c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.streaming.api.scala
 
+import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
@@ -50,7 +52,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .window(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
       .reduceWindow(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
@@ -62,7 +66,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .window(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
         def apply(
             key: Tuple,
@@ -89,7 +95,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(1000, 100))
+      .window(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
       .reduceWindow(reducer)
 
@@ -108,7 +116,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingProcessingTimeWindows.of(1000))
+      .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
       def apply(
@@ -140,8 +148,10 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(1000, 100))
-      .evictor(TimeEvictor.of(1000))
+      .window(SlidingProcessingTimeWindows.of(
+        Time.of(1, TimeUnit.SECONDS),
+        Time.of(100, TimeUnit.MILLISECONDS)))
+      .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
       .reduceWindow(reducer)
 
     val transform1 = window1.getJavaStream.getTransformation
@@ -159,7 +169,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingProcessingTimeWindows.of(1000))
+      .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {