You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:52 UTC

[3/8] flink git commit: [FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic

[FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff367d6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff367d6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff367d6e

Branch: refs/heads/master
Commit: ff367d6ea728a7c5bc334f34591a4d79e573972f
Parents: 28a38bb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 18:19:56 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  17 +--
 .../streaming/api/datastream/DataStream.java    |  20 +---
 .../streaming/api/datastream/KeyedStream.java   |  20 +---
 .../api/datastream/WindowedStream.java          |  29 ++---
 .../environment/StreamExecutionEnvironment.java |  12 +++
 .../api/windowing/assigners/GlobalWindows.java  |   3 +-
 .../assigners/SlidingProcessingTimeWindows.java | 106 -------------------
 .../windowing/assigners/SlidingTimeWindows.java |  11 +-
 .../TumblingProcessingTimeWindows.java          |  81 --------------
 .../assigners/TumblingTimeWindows.java          |  11 +-
 .../api/windowing/assigners/WindowAssigner.java |   3 +-
 .../operators/windowing/WindowOperator.java     |   5 +
 .../flink/streaming/api/CoGroupJoinITCase.java  |   6 +-
 .../windowing/AllWindowTranslationTest.java     |  60 +++++++----
 .../windowing/WindowTranslationTest.java        |  89 +++++++++++++---
 .../streaming/examples/join/WindowJoin.java     |   3 +-
 .../scala/examples/join/WindowJoin.scala        |   3 +-
 .../flink/streaming/api/scala/DataStream.scala  |  37 +------
 .../flink/streaming/api/scala/KeyedStream.scala |  37 +------
 .../api/scala/StreamExecutionEnvironment.scala  |   5 +
 .../api/scala/AllWindowTranslationTest.scala    |  24 +++--
 .../streaming/api/scala/CoGroupJoinITCase.scala |   9 +-
 .../api/scala/WindowTranslationTest.scala       |  22 ++--
 23 files changed, 233 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 89c4857..a8d7654 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -80,7 +81,7 @@ public class AllWindowedStream<T, W extends Window> {
 			WindowAssigner<? super T, W> windowAssigner) {
 		this.input = input;
 		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger();
+		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
 	}
 
 	/**
@@ -139,12 +140,14 @@ public class AllWindowedStream<T, W extends Window> {
 
 		OneInputStreamOperator<T, T> operator;
 
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceAllWindowFunction<W, T>(function),
 					trigger,
-					evictor);
+					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
 			// we need to copy because we need our own instance of the pre aggregator
@@ -154,7 +157,7 @@ public class AllWindowedStream<T, W extends Window> {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceAllWindowFunction<W, T>(function),
-					trigger);
+					trigger).enableSetProcessingTime(setProcessingTime);
 		}
 
 		return input.transform(opName, input.getType(), operator).setParallelism(1);
@@ -205,20 +208,22 @@ public class AllWindowedStream<T, W extends Window> {
 
 		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
 
-		OneInputStreamOperator<T, R> operator;
+		NonKeyedWindowOperator<T, R, W> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
 
 		if (evictor != null) {
 			operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
-					evictor);
+					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
 			operator = new NonKeyedWindowOperator<>(windowAssigner,
 					new HeapWindowBuffer.Factory<T>(),
 					function,
-					trigger);
+					trigger).enableSetProcessingTime(setProcessingTime);
 		}
 
 		return input.transform(opName, resultType, operator).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0be1d56..ee8b3d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,9 +59,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -72,7 +70,6 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
@@ -726,13 +723,7 @@ public class DataStream<T> {
 	 * @param size The size of the window.
 	 */
 	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return windowAll(TumblingTimeWindows.of(actualSize));
-		} else {
-			return windowAll(TumblingProcessingTimeWindows.of(actualSize));
-		}
+		return windowAll(TumblingTimeWindows.of(size));
 	}
 
 	/**
@@ -747,14 +738,7 @@ public class DataStream<T> {
 	 * @param size The size of the window.
 	 */
 	public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return windowAll(SlidingTimeWindows.of(size, slide));
-		} else {
-			return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
-		}
+		return windowAll(SlidingTimeWindows.of(size, slide));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index edb7981..2e6d7d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,13 +32,10 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -122,13 +119,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param size The size of the window.
 	 */
 	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return window(TumblingTimeWindows.of(actualSize));
-		} else {
-			return window(TumblingProcessingTimeWindows.of(actualSize));
-		}
+		return window(TumblingTimeWindows.of(size));
 	}
 
 	/**
@@ -143,14 +134,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * @param size The size of the window.
 	 */
 	public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
-		AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-		AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-		if (actualSize instanceof EventTime) {
-			return window(SlidingTimeWindows.of(actualSize, actualSlide));
-		} else {
-			return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
-		}
+		return window(SlidingTimeWindows.of(size, slide));
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1273b42..99f7d06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -32,8 +33,8 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
@@ -91,7 +92,7 @@ public class WindowedStream<T, K, W extends Window> {
 			WindowAssigner<? super T, W> windowAssigner) {
 		this.input = input;
 		this.windowAssigner = windowAssigner;
-		this.trigger = windowAssigner.getDefaultTrigger();
+		this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
 	}
 
 	/**
@@ -151,13 +152,15 @@ public class WindowedStream<T, K, W extends Window> {
 
 		OneInputStreamOperator<T, T> operator;
 
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
 					keySel,
 					new HeapWindowBuffer.Factory<T>(),
 					new ReduceWindowFunction<K, W, T>(function),
 					trigger,
-					evictor);
+					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
 			// we need to copy because we need our own instance of the pre aggregator
@@ -168,7 +171,7 @@ public class WindowedStream<T, K, W extends Window> {
 					keySel,
 					new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
 					new ReduceWindowFunction<K, W, T>(function),
-					trigger);
+					trigger).enableSetProcessingTime(setProcessingTime);
 		}
 
 		return input.transform(opName, input.getType(), operator);
@@ -222,7 +225,9 @@ public class WindowedStream<T, K, W extends Window> {
 		String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
 		KeySelector<T, K> keySel = input.getKeySelector();
 
-		OneInputStreamOperator<T, R> operator;
+		WindowOperator<K, T, R, W> operator;
+
+		boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
 
 		if (evictor != null) {
 			operator = new EvictingWindowOperator<>(windowAssigner,
@@ -230,14 +235,14 @@ public class WindowedStream<T, K, W extends Window> {
 					new HeapWindowBuffer.Factory<T>(),
 					function,
 					trigger,
-					evictor);
+					evictor).enableSetProcessingTime(setProcessingTime);
 
 		} else {
 			operator = new WindowOperator<>(windowAssigner,
 					keySel,
 					new HeapWindowBuffer.Factory<T>(),
 					function,
-					trigger);
+					trigger).enableSetProcessingTime(setProcessingTime);;
 		}
 
 		return input.transform(opName, resultType, operator);
@@ -450,8 +455,8 @@ public class WindowedStream<T, K, W extends Window> {
 			TypeInformation<R> resultType,
 			String functionName) {
 
-		if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+		if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSlide();
 
@@ -475,8 +480,8 @@ public class WindowedStream<T, K, W extends Window> {
 						wf, input.getKeySelector(), windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
-		} else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
-			TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+		} else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+			TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSize();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c2e2880..cc96217 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -541,11 +541,23 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Sets the time characteristic for all streams create from this environment, e.g., processing
 	 * time, event time, or ingestion time.
+	 *
+	 * <p>
+	 * If you set the characteristic to IngestionTime of EventTime this will set a default
+	 * watermark update interval of 200 ms. If this is not applicable for your application
+	 * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
 	 * 
 	 * @param characteristic The time characteristic.
 	 */
 	public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
 		this.timeCharacteristic = Objects.requireNonNull(characteristic);
+		if (characteristic == TimeCharacteristic.ProcessingTime) {
+			getConfig().disableTimestamps();
+			getConfig().setAutoWatermarkInterval(0);
+		} else {
+			getConfig().enableTimestamps();
+			getConfig().setAutoWatermarkInterval(200);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 52c8f55..dbeb5ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
@@ -42,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	}
 
 	@Override
-	public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+	public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
 		return null;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
deleted file mode 100644
index 65d7641..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing
- * is based on system time. Windows can possibly overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private final long size;
-
-	private final long slide;
-
-	private transient List<TimeWindow> result;
-
-	private SlidingProcessingTimeWindows(long size, long slide) {
-		this.size = size;
-		this.slide = slide;
-		this.result = Lists.newArrayListWithCapacity((int) (size / slide));
-	}
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-		this.result = Lists.newArrayListWithCapacity((int) (size / slide));
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		result.clear();
-		long time = System.currentTimeMillis();
-		long lastStart = time - time % slide;
-		for (long start = lastStart;
-			start > time - size;
-			start -= slide) {
-			result.add(new TimeWindow(start, size));
-		}
-		return result;
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	public long getSlide() {
-		return slide;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger() {
-		return ProcessingTimeTrigger.create();
-	}
-
-	@Override
-	public String toString() {
-		return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
-	}
-
-	/**
-	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
-	 * elements to sliding time windows based on the current processing time.
-	 *
-	 * @param size The size of the generated windows.
-	 * @param slide The slide interval of the generated windows.
-	 * @return The time policy.
-	 */
-	public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) {
-		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 52ae356..6036dfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -72,8 +75,12 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger() {
-		return WatermarkTrigger.create();
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
+			return ProcessingTimeTrigger.create();
+		} else {
+			return WatermarkTrigger.create();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
deleted file mode 100644
index 41f6362..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that windows elements into time-based windows. The windowing is
- * based on system time. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
-	private static final long serialVersionUID = 1L;
-
-	private long size;
-
-	private TumblingProcessingTimeWindows(long size) {
-		this.size = size;
-	}
-
-	@Override
-	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
-		long time = System.currentTimeMillis();
-		long start = time - (time % size);
-		return Collections.singletonList(new TimeWindow(start, size));
-	}
-
-	public long getSize() {
-		return size;
-	}
-
-	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger() {
-		return ProcessingTimeTrigger.create();
-	}
-
-	@Override
-	public String toString() {
-		return "TumblingProcessingTimeWindows(" + size + ")";
-	}
-
-	/**
-	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
-	 * elements to time windows based on the current processing time.
-	 *
-	 * @param size The size of the generated windows.
-	 * @return The time policy.
-	 */
-	public static TumblingProcessingTimeWindows of(AbstractTime size) {
-		return new TumblingProcessingTimeWindows(size.toMilliseconds());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index b6022b3..d57dc33 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -58,8 +61,12 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	}
 
 	@Override
-	public Trigger<Object, TimeWindow> getDefaultTrigger() {
-		return WatermarkTrigger.create();
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
+			return ProcessingTimeTrigger.create();
+		} else {
+			return WatermarkTrigger.create();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 20fe365..d0b1ed0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import java.io.Serializable;
@@ -50,5 +51,5 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	/**
 	 * Returns the default trigger associated with this {@code WindowAssigner}.
 	 */
-	public abstract Trigger<T, W> getDefaultTrigger();
+	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 548afb3..368b8fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -347,6 +347,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
+	public boolean isSetProcessingTime() {
+		return setProcessingTime;
+	}
+
+	@VisibleForTesting
 	public Trigger<? super IN, ? super W> getTriggerTemplate() {
 		return triggerTemplate;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
index c06a608..9ddd6eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -49,8 +49,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		testResults = Lists.newArrayList();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(1);
-		env.getConfig().enableTimestamps();
 
 		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
 			private static final long serialVersionUID = 1L;
@@ -144,8 +144,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		testResults = Lists.newArrayList();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(1);
-		env.getConfig().enableTimestamps();
 
 		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
 			private static final long serialVersionUID = 1L;
@@ -239,8 +239,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		testResults = Lists.newArrayList();
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(1);
-		env.getConfig().enableTimestamps();
 
 		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
 			private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 09a7149..4fa16ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -19,25 +19,25 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -50,30 +50,33 @@ import java.util.concurrent.TimeUnit;
 public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 	/**
-	 * These tests ensure that the fast aligned time windows operator is used if the
-	 * conditions are right.
-	 *
-	 * TODO: update once fast aligned time windows operator is in
+	 * These tests ensure that the correct trigger is set when using event-time windows.
 	 */
-	@Ignore
 	@Test
-	public void testFastTimeWindows() throws Exception {
+	@SuppressWarnings("rawtypes")
+	public void testEventTime() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+		Assert.assertFalse(winOperator1.isSetProcessingTime());
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -88,20 +91,26 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+		Assert.assertFalse(winOperator2.isSetProcessingTime());
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
 
 	@Test
 	@SuppressWarnings("rawtypes")
 	public void testNonEvicting() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduce(reducer);
 
@@ -109,12 +118,13 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
 		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+		Assert.assertTrue(winOperator1.isSetProcessingTime());
 		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -132,8 +142,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
 		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
 		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+		Assert.assertTrue(winOperator1.isSetProcessingTime());
 		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
 
@@ -141,13 +152,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 	@SuppressWarnings("rawtypes")
 	public void testEvicting() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduce(reducer);
 
@@ -155,13 +167,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
 		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertFalse(winOperator1.isSetProcessingTime());
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
@@ -180,8 +193,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
 		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
 		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
+		Assert.assertFalse(winOperator2.isSetProcessingTime());
 		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 5124add..10fe734 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -20,19 +20,20 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -59,13 +60,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DummyReducer reducer = new DummyReducer();
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
-						Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.reduce(reducer);
 
 		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -74,7 +75,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
 
@@ -92,10 +93,63 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
 	}
 
+	/**
+	 * These tests ensure that the correct trigger is set when using event-time windows.
+	 */
+	@Test
+	@SuppressWarnings("rawtypes")
+	public void testEventTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		DummyReducer reducer = new DummyReducer();
+
+		DataStream<Tuple2<String, Integer>> window1 = source
+				.keyBy(0)
+				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.reduce(reducer);
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+		Assert.assertTrue(operator1 instanceof WindowOperator);
+		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertFalse(winOperator1.isSetProcessingTime());
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+		DataStream<Tuple2<String, Integer>> window2 = source
+				.keyBy(0)
+				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void apply(Tuple tuple,
+							TimeWindow window,
+							Iterable<Tuple2<String, Integer>> values,
+							Collector<Tuple2<String, Integer>> out) throws Exception {
+
+					}
+				});
+
+		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+		Assert.assertTrue(operator2 instanceof WindowOperator);
+		WindowOperator winOperator2 = (WindowOperator) operator2;
+		Assert.assertFalse(winOperator2.isSetProcessingTime());
+		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+	}
+
 	@Test
 	@SuppressWarnings("rawtypes")
 	public void testNonEvicting() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -103,7 +157,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.trigger(CountTrigger.of(100))
 				.reduce(reducer);
 
@@ -111,13 +165,14 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
 		Assert.assertTrue(operator1 instanceof WindowOperator);
 		WindowOperator winOperator1 = (WindowOperator) operator1;
+		Assert.assertTrue(winOperator1.isSetProcessingTime());
 		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
 					private static final long serialVersionUID = 1L;
@@ -135,8 +190,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
 		Assert.assertTrue(operator2 instanceof WindowOperator);
 		WindowOperator winOperator2 = (WindowOperator) operator2;
+		Assert.assertTrue(winOperator2.isSetProcessingTime());
 		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}
 
@@ -144,6 +200,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 	@SuppressWarnings("rawtypes")
 	public void testEvicting() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
 		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -151,7 +208,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Tuple2<String, Integer>> window1 = source
 				.keyBy(0)
-				.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+				.window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
 				.evictor(CountEvictor.of(100))
 				.reduce(reducer);
 
@@ -159,14 +216,15 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
 		Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+		Assert.assertFalse(winOperator1.isSetProcessingTime());
+		Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
 		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
 		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 
 		DataStream<Tuple2<String, Integer>> window2 = source
 				.keyBy(0)
-				.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+				.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
 				.trigger(CountTrigger.of(100))
 				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
 				.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -185,8 +243,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
 		Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
 		EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+		Assert.assertFalse(winOperator2.isSetProcessingTime());
 		Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
 		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
 		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8abf9d6..5915a7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -64,7 +65,7 @@ public class WindowJoin {
 
 		// obtain execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableTimestamps();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		// connect to the data sources for grades and salaries
 		Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 225dab7..42484e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.join
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -45,7 +46,7 @@ object WindowJoin {
     }
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.enableTimestamps()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     //Create streams for grades and salaries by mapping the inputs to the corresponding objects
     val grades = setGradesInput(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7babc40..fb4d75d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
-import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
@@ -624,20 +624,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
-    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    windowAll(assigner)
   }
 
   /**
@@ -651,23 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
-    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-    val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = SlidingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case t: ProcessingTime =>
-        val assigner = SlidingProcessingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    windowAll(assigner)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 0ce36aa..c605bb1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -50,20 +50,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * @param size The size of the window.
    */
   def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
-    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    window(assigner)
   }
 
   /**
@@ -78,23 +66,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    * @param size The size of the window.
    */
   def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = {
-    val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-    val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = SlidingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case t: ProcessingTime =>
-        val assigner = SlidingProcessingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    window(assigner)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7492e48..f767aba 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -299,6 +299,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the time characteristic for all streams create from this environment, e.g., processing
    * time, event time, or ingestion time.
    *
+   * If you set the characteristic to IngestionTime of EventTime this will set a default
+   * watermark update interval of 200 ms. If this is not applicable for your application
+   * you should change it using
+   * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
+   *
    * @param characteristic The time characteristic.
    */
   def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index dece9f6..99fcd07 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,9 +22,10 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -49,13 +50,14 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
   @Test
   def testFastTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
@@ -69,7 +71,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -96,7 +98,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -110,13 +112,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
       def apply(
@@ -133,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 
@@ -146,7 +148,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
@@ -161,12 +163,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -185,7 +187,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 7232309..3f6e10f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.TimestampExtractor
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -38,9 +39,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
-    
+
     val source1 = env.addSource(new SourceFunction[(String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
         ctx.collect(("a", 0))
@@ -101,8 +102,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
 
     val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -177,8 +178,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
 
     val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {