You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:22 UTC

[07/11] flink git commit: [FLINK-3674] Add an interface for Time aware User Functions

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 448f95f..00d4722 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -31,12 +30,6 @@ public class CoStreamMap<IN1, IN2, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	// We keep track of watermarks from both inputs, the combined input is the minimum
-	// Once the minimum advances we emit a new watermark for downstream operators
-	private long combinedWatermark = Long.MIN_VALUE;
-	private long input1Watermark = Long.MIN_VALUE;
-	private long input2Watermark = Long.MIN_VALUE;
-
 	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
 		super(mapper);
 	}
@@ -50,24 +43,4 @@ public class CoStreamMap<IN1, IN2, OUT>
 	public void processElement2(StreamRecord<IN2> element) throws Exception {
 		output.collect(element.replace(userFunction.map2(element.getValue())));
 	}
-
-	@Override
-	public void processWatermark1(Watermark mark) throws Exception {
-		input1Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
-
-	@Override
-	public void processWatermark2(Watermark mark) throws Exception {
-		input2Watermark = mark.getTimestamp();
-		long newMin = Math.min(input1Watermark, input2Watermark);
-		if (newMin > combinedWatermark) {
-			combinedWatermark = newMin;
-			output.emitWatermark(new Watermark(combinedWatermark));
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
new file mode 100644
index 0000000..df2320f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@Internal
+public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
+		extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
+		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final TypeSerializer<K> keySerializer;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	public CoStreamTimelyFlatMap(
+			TypeSerializer<K> keySerializer,
+			TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+
+		this.keySerializer = keySerializer;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+	}
+
+	@Override
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		collector.setTimestamp(element);
+		userFunction.flatMap1(element.getValue(), timerService, collector);
+
+	}
+
+	@Override
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		collector.setTimestamp(element);
+		userFunction.flatMap2(element.getValue(), timerService, collector);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector);
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector);
+	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b5500b7..36492d7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
@@ -76,6 +75,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 
 	@Override
 	public void open() throws Exception {
+		super.open();
 		committer.setOperatorId(id);
 		committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
 		committer.open();
@@ -113,6 +113,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	public void snapshotState(FSDataOutputStream out,
 			long checkpointId,
 			long timestamp) throws Exception {
+		super.snapshotState(out, checkpointId, timestamp);
 
 		saveHandleInState(checkpointId, timestamp);
 
@@ -121,6 +122,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
+		super.restoreState(in);
 
 		this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader());
 	}
@@ -203,11 +205,6 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 		serializer.serialize(value, new DataOutputViewStreamWrapper(out));
 	}
 
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		//don't do anything, since we are a sink
-	}
-
 	/**
 	 * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
 	 * used since the last completed checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index d331d4d..2a77c0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -208,11 +207,6 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) {
-		// this operator does not react to watermarks
-	}
-
-	@Override
 	public void trigger(long timestamp) throws Exception {
 		// first we check if we actually trigger the window function
 		if (timestamp == nextEvaluationTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 79ef4c6..a252ece 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -66,7 +66,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 			// optimized path for single pane case (tumbling window)
 			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
 				Key key = entry.getKey();
-				operator.setKeyContext(key);
+				operator.setCurrentKey(key);
 				function.apply(entry.getKey(), window, entry.getValue(), out);
 			}
 		}
@@ -122,7 +122,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 		@Override
 		public void keyDone() throws Exception {
-			contextOperator.setKeyContext(currentKey);
+			contextOperator.setCurrentKey(currentKey);
 			function.apply(currentKey, window, unionIterator, out);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index dfa357e..84686a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -93,7 +93,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 		@Override
 		public void startNewKey(Key key) {
 			currentValue = null;
-			operator.setKeyContext(key);
+			operator.setCurrentKey(key);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 141b5b8..2f4dbde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -204,110 +204,82 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		boolean fire;
-		do {
-			Timer<K, W> timer = watermarkTimersQueue.peek();
-			if (timer != null && timer.timestamp <= mark.getTimestamp()) {
-				fire = true;
-
-				watermarkTimers.remove(timer);
-				watermarkTimersQueue.remove();
-
-				context.key = timer.key;
-				context.window = timer.window;
-				setKeyContext(timer.key);
-
-				ListState<StreamRecord<IN>> windowState;
-				MergingWindowSet<W> mergingWindows = null;
-
-				if (windowAssigner instanceof MergingWindowAssigner) {
-					mergingWindows = getMergingWindowSet();
-					W stateWindow = mergingWindows.getStateWindow(context.window);
-					if (stateWindow == null) {
-						// then the window is already purged and this is a cleanup
-						// timer set due to allowed lateness that has nothing to clean,
-						// so it is safe to just ignore
-						continue;
-					}
-					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-				} else {
-					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-				}
+	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
 
-				Iterable<StreamRecord<IN>> contents = windowState.get();
-				if (contents == null) {
-					// if we have no state, there is nothing to do
-					continue;
-				}
-
-				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				if (triggerResult.isFire()) {
-					fire(context.window, contents);
-				}
+		context.key = timer.getKey();
+		context.window = timer.getNamespace();
 
-				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
-					cleanup(context.window, windowState, mergingWindows);
-				}
+		ListState<StreamRecord<IN>> windowState;
+		MergingWindowSet<W> mergingWindows = null;
 
-			} else {
-				fire = false;
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = getMergingWindowSet();
+			W stateWindow = mergingWindows.getStateWindow(context.window);
+			if (stateWindow == null) {
+				// then the window is already purged and this is a cleanup
+				// timer set due to allowed lateness that has nothing to clean,
+				// so it is safe to just ignore
+				return;
 			}
-		} while (fire);
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+		} else {
+			windowState = getPartitionedState(
+					context.window,
+					windowSerializer,
+					windowStateDescriptor);
+		}
+
+		Iterable<StreamRecord<IN>> contents = windowState.get();
+		if (contents == null) {
+			// if we have no state, there is nothing to do
+			return;
+		}
 
-		output.emitWatermark(mark);
+		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+		if (triggerResult.isFire()) {
+			fire(context.window, contents);
+		}
 
-		this.currentWatermark = mark.getTimestamp();
+		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+			cleanup(context.window, windowState, mergingWindows);
+		}
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
-		Timer<K, W> timer;
-
-		while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
-
-			processingTimeTimers.remove(timer);
-			processingTimeTimersQueue.remove();
-
-			context.key = timer.key;
-			context.window = timer.window;
-			setKeyContext(timer.key);
+	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+		context.key = timer.getKey();
+		context.window = timer.getNamespace();
 
-			ListState<StreamRecord<IN>> windowState;
-			MergingWindowSet<W> mergingWindows = null;
+		ListState<StreamRecord<IN>> windowState;
+		MergingWindowSet<W> mergingWindows = null;
 
-			if (windowAssigner instanceof MergingWindowAssigner) {
-				mergingWindows = getMergingWindowSet();
-				W stateWindow = mergingWindows.getStateWindow(context.window);
-				if (stateWindow == null) {
-					// then the window is already purged and this is a cleanup
-					// timer set due to allowed lateness that has nothing to clean,
-					// so it is safe to just ignore
-					continue;
-				}
-				windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-			} else {
-				windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-			}
-
-			Iterable<StreamRecord<IN>> contents = windowState.get();
-			if (contents == null) {
-				// if we have no state, there is nothing to do
-				continue;
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = getMergingWindowSet();
+			W stateWindow = mergingWindows.getStateWindow(context.window);
+			if (stateWindow == null) {
+				// then the window is already purged and this is a cleanup
+				// timer set due to allowed lateness that has nothing to clean,
+				// so it is safe to just ignore
+				return;
 			}
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+		} else {
+			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+		}
 
-			TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-			if (triggerResult.isFire()) {
-				fire(context.window, contents);
-			}
+		Iterable<StreamRecord<IN>> contents = windowState.get();
+		if (contents == null) {
+			// if we have no state, there is nothing to do
+			return;
+		}
 
-			if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
-				cleanup(context.window, windowState, mergingWindows);
-			}
+		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+		if (triggerResult.isFire()) {
+			fire(context.window, contents);
 		}
 
-		if (timer != null) {
-			nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
+		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+			cleanup(context.window, windowState, mergingWindows);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 459c679..bc37692 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -37,36 +37,28 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -96,7 +88,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
-	implements OneInputStreamOperator<IN, OUT>, Triggerable {
+	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -141,14 +133,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
-	protected transient ScheduledFuture<?> nextTimer;
-
-	/**
-	 * To keep track of the current watermark so that we can immediately fire if a trigger
-	 * registers an event time callback for a timestamp that lies in the past.
-	 */
-	protected long currentWatermark = Long.MIN_VALUE;
-
 	protected transient Context context = new Context(null, null);
 
 	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
@@ -157,17 +141,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Processing time timers that are currently in-flight.
-	 */
-	protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
-	protected transient Set<Timer<K, W>> processingTimeTimers;
-
-	/**
-	 * Current waiting watermark callbacks.
-	 */
-	protected transient Set<Timer<K, W>> watermarkTimers;
-	protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
+	private transient InternalTimerService<W> internalTimerService;
 
 	protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
 
@@ -208,49 +182,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		timestampedCollector = new TimestampedCollector<>(output);
 
-		// these could already be initialized from restoreState()
-		if (watermarkTimers == null) {
-			watermarkTimers = new HashSet<>();
-			watermarkTimersQueue = new PriorityQueue<>(100);
-		}
-		if (processingTimeTimers == null) {
-			processingTimeTimers = new HashSet<>();
-			processingTimeTimersQueue = new PriorityQueue<>(100);
-		}
+		internalTimerService =
+				getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
 
 		context = new Context(null, null);
 
 		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
 			public long getCurrentProcessingTime() {
-				return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+				return internalTimerService.currentProcessingTime();
 			}
 		};
 
 		if (windowAssigner instanceof MergingWindowAssigner) {
 			mergingWindowsByKey = new HashMap<>();
 		}
-
-		// re-register the restored timers (if any)
-		if (processingTimeTimersQueue.size() > 0) {
-			nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
-		}
 	}
 
 	@Override
 	public final void close() throws Exception {
 		super.close();
-
-		if (nextTimer != null) {
-			nextTimer.cancel(false);
-			nextTimer = null;
-		}
-
 		timestampedCollector = null;
-		watermarkTimers = null;
-		watermarkTimersQueue = null;
-		processingTimeTimers = null;
-		processingTimeTimersQueue = null;
 		context = null;
 		windowAssignerContext = null;
 		mergingWindowsByKey = null;
@@ -259,17 +211,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	@Override
 	public void dispose() throws Exception {
 		super.dispose();
-
-		if (nextTimer != null) {
-			nextTimer.cancel(false);
-			nextTimer = null;
-		}
-
 		timestampedCollector = null;
-		watermarkTimers = null;
-		watermarkTimersQueue = null;
-		processingTimeTimers = null;
-		processingTimeTimersQueue = null;
 		context = null;
 		windowAssignerContext = null;
 		mergingWindowsByKey = null;
@@ -392,110 +334,81 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		boolean fire;
-		do {
-			Timer<K, W> timer = watermarkTimersQueue.peek();
-			if (timer != null && timer.timestamp <= mark.getTimestamp()) {
-				fire = true;
-
-				watermarkTimers.remove(timer);
-				watermarkTimersQueue.remove();
-
-				context.key = timer.key;
-				context.window = timer.window;
-				setKeyContext(timer.key);
-
-				AppendingState<IN, ACC> windowState;
-				MergingWindowSet<W> mergingWindows = null;
-
-				if (windowAssigner instanceof MergingWindowAssigner) {
-					mergingWindows = getMergingWindowSet();
-					W stateWindow = mergingWindows.getStateWindow(context.window);
-					if (stateWindow == null) {
-						// then the window is already purged and this is a cleanup
-						// timer set due to allowed lateness that has nothing to clean,
-						// so it is safe to just ignore
-						continue;
-					}
-					windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-				} else {
-					windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-				}
-
-				ACC contents = windowState.get();
-				if (contents == null) {
-					// if we have no state, there is nothing to do
-					continue;
-				}
-
-				TriggerResult triggerResult = context.onEventTime(timer.timestamp);
-				if (triggerResult.isFire()) {
-					fire(context.window, contents);
-				}
+	public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+		context.key = timer.getKey();
+		context.window = timer.getNamespace();
 
-				if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
-					cleanup(context.window, windowState, mergingWindows);
-				}
+		AppendingState<IN, ACC> windowState;
+		MergingWindowSet<W> mergingWindows = null;
 
-			} else {
-				fire = false;
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = getMergingWindowSet();
+			W stateWindow = mergingWindows.getStateWindow(context.window);
+			if (stateWindow == null) {
+				// then the window is already purged and this is a cleanup
+				// timer set due to allowed lateness that has nothing to clean,
+				// so it is safe to just ignore
+				return;
 			}
-		} while (fire);
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+		} else {
+			windowState = getPartitionedState(
+					context.window,
+					windowSerializer,
+					windowStateDescriptor);
+		}
+
+		ACC contents = windowState.get();
+		if (contents == null) {
+			// if we have no state, there is nothing to do
+			return;
+		}
 
-		output.emitWatermark(mark);
+		TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+		if (triggerResult.isFire()) {
+			fire(context.window, contents);
+		}
 
-		this.currentWatermark = mark.getTimestamp();
+		if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+			cleanup(context.window, windowState, mergingWindows);
+		}
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
-		Timer<K, W> timer;
-
-		while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
-
-			processingTimeTimers.remove(timer);
-			processingTimeTimersQueue.remove();
-
-			context.key = timer.key;
-			context.window = timer.window;
-			setKeyContext(timer.key);
-
-			AppendingState<IN, ACC> windowState;
-			MergingWindowSet<W> mergingWindows = null;
+	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+		context.key = timer.getKey();
+		context.window = timer.getNamespace();
 
-			if (windowAssigner instanceof MergingWindowAssigner) {
-				mergingWindows = getMergingWindowSet();
-				W stateWindow = mergingWindows.getStateWindow(context.window);
-				if (stateWindow == null) {
-					// then the window is already purged and this is a cleanup
-					// timer set due to allowed lateness that has nothing to clean,
-					// so it is safe to just ignore
-					continue;
-				}
-				windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
-			} else {
-				windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
-			}
+		AppendingState<IN, ACC> windowState;
+		MergingWindowSet<W> mergingWindows = null;
 
-			ACC contents = windowState.get();
-			if (contents == null) {
-				// if we have no state, there is nothing to do
-				continue;
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			mergingWindows = getMergingWindowSet();
+			W stateWindow = mergingWindows.getStateWindow(context.window);
+			if (stateWindow == null) {
+				// then the window is already purged and this is a cleanup
+				// timer set due to allowed lateness that has nothing to clean,
+				// so it is safe to just ignore
+				return;
 			}
+			windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+		} else {
+			windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+		}
 
-			TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
-			if (triggerResult.isFire()) {
-				fire(context.window, contents);
-			}
+		ACC contents = windowState.get();
+		if (contents == null) {
+			// if we have no state, there is nothing to do
+			return;
+		}
 
-			if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
-				cleanup(context.window, windowState, mergingWindows);
-			}
+		TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+		if (triggerResult.isFire()) {
+			fire(context.window, contents);
 		}
 
-		if (timer != null) {
-			nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
+		if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+			cleanup(context.window, windowState, mergingWindows);
 		}
 	}
 
@@ -555,7 +468,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * 					considered when triggering.
 	 */
 	protected boolean isLate(W window) {
-		return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
+		return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
 	}
 
 	/**
@@ -638,7 +551,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		public long getCurrentWatermark() {
-			return currentWatermark;
+			return internalTimerService.currentWatermark();
 		}
 
 		@Override
@@ -697,54 +610,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		@Override
 		public long getCurrentProcessingTime() {
-			return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+			return internalTimerService.currentProcessingTime();
 		}
 
 		@Override
 		public void registerProcessingTimeTimer(long time) {
-			Timer<K, W> timer = new Timer<>(time, key, window);
-
-			// make sure we only put one timer per key into the queue
-			if (processingTimeTimers.add(timer)) {
-
-				Timer<K, W> oldHead = processingTimeTimersQueue.peek();
-				long nextTriggerTime = oldHead != null ? oldHead.timestamp : Long.MAX_VALUE; 
-
-				processingTimeTimersQueue.add(timer);
-
-				// check if we need to re-schedule our timer to earlier
-				if (time < nextTriggerTime) {
-					if (nextTimer != null) {
-						nextTimer.cancel(false);
-					}
-					nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this);
-				}
-			}
+			internalTimerService.registerProcessingTimeTimer(window, time);
 		}
 
 		@Override
 		public void registerEventTimeTimer(long time) {
-			Timer<K, W> timer = new Timer<>(time, key, window);
-			if (watermarkTimers.add(timer)) {
-				watermarkTimersQueue.add(timer);
-			}
+			internalTimerService.registerEventTimeTimer(window, time);
 		}
 
 		@Override
 		public void deleteProcessingTimeTimer(long time) {
-			Timer<K, W> timer = new Timer<>(time, key, window);
-
-			if (processingTimeTimers.remove(timer)) {
-				processingTimeTimersQueue.remove(timer);
-			}
+			internalTimerService.deleteProcessingTimeTimer(window, time);
 		}
 
 		@Override
 		public void deleteEventTimeTimer(long time) {
-			Timer<K, W> timer = new Timer<>(time, key, window);
-			if (watermarkTimers.remove(timer)) {
-				watermarkTimersQueue.remove(timer);
-			}
+			internalTimerService.deleteEventTimeTimer(window, time);
 		}
 
 		public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
@@ -843,67 +729,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 			TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
 			ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
 			for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
-				setKeyContext(key.getKey());
+				setCurrentKey(key.getKey());
 				ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
 				mergeState.clear();
 				key.getValue().persist(mergeState);
 			}
 		}
 
-		snapshotTimers(new DataOutputViewStreamWrapper(out));
-
 		super.snapshotState(out, checkpointId, timestamp);
 	}
 
 	@Override
 	public void restoreState(FSDataInputStream in) throws Exception {
-		restoreTimers(new DataInputViewStreamWrapper(in));
-
 		super.restoreState(in);
 	}
 
-	private void restoreTimers(DataInputView in ) throws IOException {
-		int numWatermarkTimers = in.readInt();
-		watermarkTimers = new HashSet<>(numWatermarkTimers);
-		watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
-		for (int i = 0; i < numWatermarkTimers; i++) {
-			K key = keySerializer.deserialize(in);
-			W window = windowSerializer.deserialize(in);
-			long timestamp = in.readLong();
-			Timer<K, W> timer = new Timer<>(timestamp, key, window);
-			watermarkTimers.add(timer);
-			watermarkTimersQueue.add(timer);
-		}
-
-		int numProcessingTimeTimers = in.readInt();
-		processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
-		processingTimeTimers = new HashSet<>();
-		for (int i = 0; i < numProcessingTimeTimers; i++) {
-			K key = keySerializer.deserialize(in);
-			W window = windowSerializer.deserialize(in);
-			long timestamp = in.readLong();
-			Timer<K, W> timer = new Timer<>(timestamp, key, window);
-			processingTimeTimersQueue.add(timer);
-			processingTimeTimers.add(timer);
-		}
-	}
-
-	private void snapshotTimers(DataOutputView out) throws IOException {
-		out.writeInt(watermarkTimersQueue.size());
-		for (Timer<K, W> timer : watermarkTimersQueue) {
-			keySerializer.serialize(timer.key, out);
-			windowSerializer.serialize(timer.window, out);
-			out.writeLong(timer.timestamp);
-		}
-
-		out.writeInt(processingTimeTimers.size());
-		for (Timer<K,W> timer : processingTimeTimers) {
-			keySerializer.serialize(timer.key, out);
-			windowSerializer.serialize(timer.window, out);
-			out.writeLong(timer.timestamp);
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	// Getters for testing
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index d2bf133..d0a2ea9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -20,9 +20,11 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
@@ -42,7 +44,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	private volatile boolean isQuiesced;
 
 	// sorts the timers by timestamp so that they are processed in the correct order.
-	private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
+	private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = new TreeMap<>();
 
 	
 	public void setCurrentTime(long timestamp) throws Exception {
@@ -53,10 +55,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			// we do not fire them here to be able to accommodate timers
 			// that register other timers.
 	
-			Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
-			List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+			Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> it = registeredTasks.entrySet().iterator();
+			List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun = new ArrayList<>();
 			while (it.hasNext()) {
-				Map.Entry<Long, List<Triggerable>> t = it.next();
+				Map.Entry<Long, List<ScheduledTimerFuture>> t = it.next();
 				if (t.getKey() <= this.currentTime) {
 					toRun.add(t);
 					it.remove();
@@ -64,10 +66,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			}
 	
 			// now do the actual firing.
-			for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+			for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
 				long now = tasks.getKey();
-				for (Triggerable task: tasks.getValue()) {
-					task.trigger(now);
+				for (ScheduledTimerFuture task: tasks.getValue()) {
+					task.getTriggerable().trigger(now);
 				}
 			}
 		}
@@ -84,7 +86,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new IllegalStateException("terminated");
 		}
 		if (isQuiesced) {
-			return new DummyFuture();
+			return new ScheduledTimerFuture(null, -1);
 		}
 
 		if (timestamp <= currentTime) {
@@ -94,14 +96,17 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 				throw new RuntimeException(e);
 			}
 		}
-		List<Triggerable> tasks = registeredTasks.get(timestamp);
+
+		ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp);
+
+		List<ScheduledTimerFuture> tasks = registeredTasks.get(timestamp);
 		if (tasks == null) {
 			tasks = new ArrayList<>();
 			registeredTasks.put(timestamp, tasks);
 		}
-		tasks.add(target);
+		tasks.add(result);
 
-		return new DummyFuture();
+		return result;
 	}
 
 	@Override
@@ -124,15 +129,34 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 	public int getNumRegisteredTimers() {
 		int count = 0;
-		for (List<Triggerable> tasks: registeredTasks.values()) {
+		for (List<ScheduledTimerFuture> tasks: registeredTasks.values()) {
 			count += tasks.size();
 		}
 		return count;
 	}
 
+	public Set<Long> getRegisteredTimerTimestamps() {
+		Set<Long> actualTimestamps = new HashSet<>();
+		for (List<ScheduledTimerFuture> timerFutures : registeredTasks.values()) {
+			for (ScheduledTimerFuture timer : timerFutures) {
+				actualTimestamps.add(timer.getTimestamp());
+			}
+		}
+		return actualTimestamps;
+	}
+
 	// ------------------------------------------------------------------------
 
-	private static class DummyFuture implements ScheduledFuture<Object> {
+	private class ScheduledTimerFuture implements ScheduledFuture<Object> {
+
+		private final Triggerable triggerable;
+
+		private final long timestamp;
+
+		public ScheduledTimerFuture(Triggerable triggerable, long timestamp) {
+			this.triggerable = triggerable;
+			this.timestamp = timestamp;
+		}
 
 		@Override
 		public long getDelay(TimeUnit unit) {
@@ -146,6 +170,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 		@Override
 		public boolean cancel(boolean mayInterruptIfRunning) {
+			List<ScheduledTimerFuture> scheduledTimerFutures = registeredTasks.get(timestamp);
+			if (scheduledTimerFutures != null) {
+				scheduledTimerFutures.remove(this);
+			}
 			return true;
 		}
 
@@ -168,5 +196,13 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 		public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 			throw new UnsupportedOperationException();
 		}
+
+		public Triggerable getTriggerable() {
+			return triggerable;
+		}
+
+		public long getTimestamp() {
+			return timestamp;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
new file mode 100644
index 0000000..84af997
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+	private static InternalTimer<Integer, String> anyInternalTimer() {
+		return any();
+	}
+
+	/**
+	 * Verify that we only ever have one processing-time task registered at the
+	 * {@link ProcessingTimeService}.
+	 */
+	@Test
+	public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("ciao", 20);
+		timerService.registerProcessingTimeTimer("ciao", 30);
+		timerService.registerProcessingTimeTimer("hello", 10);
+		timerService.registerProcessingTimeTimer("hello", 20);
+
+		assertEquals(5, timerService.numProcessingTimeTimers());
+		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+
+		processingTimeService.setCurrentTime(10);
+
+		assertEquals(3, timerService.numProcessingTimeTimers());
+		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+		processingTimeService.setCurrentTime(20);
+
+		assertEquals(1, timerService.numProcessingTimeTimers());
+		assertEquals(0, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+
+		processingTimeService.setCurrentTime(30);
+
+		assertEquals(0, timerService.numProcessingTimeTimers());
+
+		assertEquals(0, processingTimeService.getNumRegisteredTimers());
+
+		timerService.registerProcessingTimeTimer("ciao", 40);
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+	}
+
+	/**
+	 * Verify that registering a processing-time timer that is earlier than the existing timers
+	 * removes the one physical timer and creates one for the earlier timestamp
+	 * {@link ProcessingTimeService}.
+	 */
+	@Test
+	public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 20);
+
+		assertEquals(1, timerService.numProcessingTimeTimers());
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+
+		assertEquals(2, timerService.numProcessingTimeTimers());
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+	}
+
+	/**
+	 */
+	@Test
+	public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+		final HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+
+		assertEquals(1, timerService.numProcessingTimeTimers());
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Exception {
+				timerService.registerProcessingTimeTimer("ciao", 20);
+				return null;
+			}
+		}).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+		processingTimeService.setCurrentTime(10);
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocation) throws Exception {
+				timerService.registerProcessingTimeTimer("ciao", 30);
+				return null;
+			}
+		}).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+		processingTimeService.setCurrentTime(20);
+
+		assertEquals(1, timerService.numProcessingTimeTimers());
+
+		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+	}
+
+
+	@Test
+	public void testCurrentProcessingTime() throws Exception {
+
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		processingTimeService.setCurrentTime(17L);
+		assertEquals(17, timerService.currentProcessingTime());
+
+		processingTimeService.setCurrentTime(42);
+		assertEquals(42, timerService.currentProcessingTime());
+	}
+
+	@Test
+	public void testCurrentEventTime() throws Exception {
+
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		timerService.advanceWatermark(17);
+		assertEquals(17, timerService.currentWatermark());
+
+		timerService.advanceWatermark(42);
+		assertEquals(42, timerService.currentWatermark());
+	}
+
+	/**
+	 * This also verifies that we don't have leakage between keys/namespaces.
+	 */
+	@Test
+	public void testSetAndFireEventTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		assertEquals(4, timerService.numEventTimeTimers());
+		assertEquals(2, timerService.numEventTimeTimers("hello"));
+		assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+		timerService.advanceWatermark(10);
+
+		verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer());
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+		assertEquals(0, timerService.numEventTimeTimers());
+	}
+
+	/**
+	 * This also verifies that we don't have leakage between keys/namespaces.
+	 */
+	@Test
+	public void testSetAndFireProcessingTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		assertEquals(4, timerService.numProcessingTimeTimers());
+		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+		processingTimeService.setCurrentTime(10);
+
+		verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer());
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+		assertEquals(0, timerService.numProcessingTimeTimers());
+	}
+
+	/**
+	 * This also verifies that we don't have leakage between keys/namespaces.
+	 *
+	 * <p>This also verifies that deleted timers don't fire.
+	 */
+	@Test
+	public void testDeleteEventTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		assertEquals(4, timerService.numEventTimeTimers());
+		assertEquals(2, timerService.numEventTimeTimers("hello"));
+		assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+		keyContext.setCurrentKey(0);
+		timerService.deleteEventTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+		timerService.deleteEventTimeTimer("ciao", 10);
+
+		assertEquals(2, timerService.numEventTimeTimers());
+		assertEquals(1, timerService.numEventTimeTimers("hello"));
+		assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+		timerService.advanceWatermark(10);
+
+		verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer());
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
+		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+		verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+		verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+		assertEquals(0, timerService.numEventTimeTimers());
+	}
+
+	/**
+	 * This also verifies that we don't have leakage between keys/namespaces.
+	 *
+	 * <p>This also verifies that deleted timers don't fire.
+	 */
+	@Test
+	public void testDeleteProcessingTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		assertEquals(4, timerService.numProcessingTimeTimers());
+		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+		keyContext.setCurrentKey(0);
+		timerService.deleteProcessingTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+		timerService.deleteProcessingTimeTimer("ciao", 10);
+
+		assertEquals(2, timerService.numProcessingTimeTimers());
+		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+		processingTimeService.setCurrentTime(10);
+
+		verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer());
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
+		verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
+		verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+		assertEquals(0, timerService.numEventTimeTimers());
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		HeapInternalTimerService<Integer, String> timerService =
+				createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+		keyContext.setCurrentKey(0);
+
+		timerService.registerProcessingTimeTimer("ciao", 10);
+		timerService.registerEventTimeTimer("hello", 10);
+
+		keyContext.setCurrentKey(1);
+
+		timerService.registerEventTimeTimer("ciao", 10);
+		timerService.registerProcessingTimeTimer("hello", 10);
+
+		assertEquals(2, timerService.numProcessingTimeTimers());
+		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+		assertEquals(2, timerService.numEventTimeTimers());
+		assertEquals(1, timerService.numEventTimeTimers("hello"));
+		assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+		ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+		timerService.snapshotTimers(outStream);
+		outStream.close();
+
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
+
+		keyContext = new TestKeyContext();
+		processingTimeService = new TestProcessingTimeService();
+
+		timerService = restoreTimerService(
+				new ByteArrayInputStream(outStream.toByteArray()),
+				mockTriggerable2,
+				keyContext,
+				processingTimeService);
+
+		processingTimeService.setCurrentTime(10);
+		timerService.advanceWatermark(10);
+
+		verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer());
+		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+		verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+		verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer());
+		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+		verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+
+		assertEquals(0, timerService.numEventTimeTimers());
+	}
+
+
+	private static class TestKeyContext implements KeyContext {
+
+		private Object key;
+
+		@Override
+		public void setCurrentKey(Object key) {
+			this.key = key;
+		}
+
+		@Override
+		public Object getCurrentKey() {
+			return key;
+		}
+	}
+
+	private static HeapInternalTimerService<Integer, String> createTimerService(
+			Triggerable<Integer, String> triggerable,
+			KeyContext keyContext,
+			ProcessingTimeService processingTimeService) {
+		return new HeapInternalTimerService<>(
+				IntSerializer.INSTANCE,
+				StringSerializer.INSTANCE,
+				triggerable,
+				keyContext,
+				processingTimeService);
+	}
+
+	private static HeapInternalTimerService<Integer, String> restoreTimerService(
+			InputStream stateStream,
+			Triggerable<Integer, String> triggerable,
+			KeyContext keyContext,
+			ProcessingTimeService processingTimeService) throws Exception {
+		HeapInternalTimerService.RestoredTimers<Integer, String> restoredTimers =
+				new HeapInternalTimerService.RestoredTimers<>(
+						stateStream,
+						HeapInternalTimerServiceTest.class.getClassLoader());
+
+		return new HeapInternalTimerService<>(
+				IntSerializer.INSTANCE,
+				StringSerializer.INSTANCE,
+				triggerable,
+				keyContext,
+				processingTimeService,
+				restoredTimers);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
new file mode 100644
index 0000000..6edf20a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+	@Test
+	public void testCurrentEventTime() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, String> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(17));
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.processElement(new StreamRecord<>(6, 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5TIME:17", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6TIME:42", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testCurrentProcessingTime() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, String> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement(new StreamRecord<>(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5TIME:17"));
+		expectedOutput.add(new StreamRecord<>("6TIME:42"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(0));
+
+		testHarness.processElement(new StreamRecord<>(17, 42L));
+
+		testHarness.processWatermark(new Watermark(5));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(0L));
+		expectedOutput.add(new StreamRecord<>(17, 42L));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+		expectedOutput.add(new Watermark(5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(17));
+
+		testHarness.setProcessingTime(5);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>(17));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, String> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(1));
+		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark(new Watermark(2));
+		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+		testHarness.processWatermark(new Watermark(6));
+		testHarness.processWatermark(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, String> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		StreamTimelyFlatMap<Integer, Integer, String> operator =
+				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		// snapshot and restore from scratch
+		StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
+
+		testHarness.close();
+
+		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.restore(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		System.out.println("GOT: " + testHarness.getOutput());
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public QueryingFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				out.collect(value + "TIME:" + timerService.currentWatermark());
+			} else {
+				out.collect(value + "TIME:" + timerService.currentProcessingTime());
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception {
+			out.collect(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+			} else {
+				timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<Integer> out) throws Exception {
+
+			assertEquals(this.timeDomain, timeDomain);
+			out.collect(1777);
+		}
+	}
+
+	private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Integer> state =
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			out.collect("INPUT:" + value);
+			getRuntimeContext().getState(state).update(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+			} else {
+				timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+			assertEquals(this.timeDomain, timeDomain);
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+			timerService.registerProcessingTimeTimer(5);
+			timerService.registerEventTimeTimer(6);
+
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				TimeDomain timeDomain,
+				TimerService timerService,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+
+}