You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/09/23 13:13:52 UTC

[1/3] flink git commit: [FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator.

Repository: flink
Updated Branches:
  refs/heads/master 568845a3c -> 51a5048b2


[FLINK-4494] Expose the TimeServiceProvider from the Task to each Operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ffff2997
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ffff2997
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ffff2997

Branch: refs/heads/master
Commit: ffff2997a5c82386a6428744f39b808a3fb49538
Parents: 4779c7e
Author: kl0u <kk...@gmail.com>
Authored: Tue Sep 20 14:45:01 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 23 15:01:06 2016 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |  19 +--
 .../streaming/api/operators/StreamSource.java   |  23 ++--
 .../api/operators/StreamingRuntimeContext.java  |  19 ---
 .../api/windowing/assigners/WindowAssigner.java |   4 +-
 .../api/windowing/triggers/Trigger.java         |   4 +-
 .../operators/ExtractTimestampsOperator.java    |   9 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |   6 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   8 +-
 .../operators/windowing/WindowOperator.java     |   7 +-
 .../streaming/runtime/tasks/StreamTask.java     |  19 +--
 .../operators/StreamSourceOperatorTest.java     |  28 +---
 .../runtime/operators/StreamTaskTimerTest.java  |  12 +-
 .../runtime/operators/TimeProviderTest.java     |  14 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 104 +++++++--------
 ...AlignedProcessingTimeWindowOperatorTest.java | 128 ++++++++-----------
 .../runtime/tasks/StreamTaskTestHarness.java    |   9 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   2 +-
 .../flink/streaming/util/MockContext.java       |  35 +----
 .../util/OneInputStreamOperatorTestHarness.java |  21 +--
 .../runtime/StreamTaskTimerITCase.java          |  10 +-
 20 files changed, 177 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 71296e3..a73f3b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -35,14 +35,12 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ScheduledFuture;
-
 /**
  * Base class for all stream operators. Operators that contain a user function should extend the class 
  * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class). 
@@ -230,18 +228,11 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	/**
-	 * Register a timer callback. At the specified time the provided {@link Triggerable} will
-	 * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator.
-	 *
-	 * @param time The absolute time in milliseconds.
-	 * @param target The target to be triggered.
+	 * Returns the {@link TimeServiceProvider} responsible for getting  the current
+	 * processing time and registering timers.
 	 */
-	protected ScheduledFuture<?> registerTimer(long time, Triggerable target) {
-		return container.registerTimer(time, target);
-	}
-
-	protected long getCurrentProcessingTime() {
-		return container.getCurrentProcessingTime();
+	protected TimeServiceProvider getTimerService() {
+		return container.getTimerService();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 38c948b..22987ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import java.util.concurrent.ScheduledFuture;
 
@@ -189,6 +190,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
 		private final StreamSource<?, ?> owner;
+		private final TimeServiceProvider timeService;
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
@@ -209,14 +211,15 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 			}
 
 			this.owner = owner;
+			this.timeService = owner.getTimerService();
 			this.lockingObject = lockingObjectParam;
 			this.output = outputParam;
 			this.watermarkInterval = watermarkInterval;
 			this.reuse = new StreamRecord<T>(null);
 
-			long now = owner.getCurrentProcessingTime();
-			this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
-				new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
+			long now = this.timeService.getCurrentProcessingTime();
+			this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
+				new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam));
 		}
 
 		@Override
@@ -224,7 +227,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 			owner.checkAsyncException();
 			
 			synchronized (lockingObject) {
-				final long currentTime = owner.getCurrentProcessingTime();
+				final long currentTime = this.timeService.getCurrentProcessingTime();
 				output.collect(reuse.replace(element, currentTime));
 
 				// this is to avoid lock contention in the lockingObject by
@@ -276,19 +279,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		private class WatermarkEmittingTask implements Triggerable {
 
-			private final StreamSource<?, ?> owner;
+			private final TimeServiceProvider timeService;
 			private final Object lockingObject;
 			private final Output<StreamRecord<T>> output;
 
-			private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
-				this.owner = src;
+			private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) {
+				this.timeService = timeService;
 				this.lockingObject = lock;
 				this.output = output;
 			}
 
 			@Override
 			public void trigger(long timestamp) {
-				final long currentTime = owner.getCurrentProcessingTime();
+				final long currentTime = this.timeService.getCurrentProcessingTime();
 
 				if (currentTime > nextWatermarkTime) {
 					// align the watermarks across all machines. this will ensure that we
@@ -304,8 +307,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 					}
 				}
 
-				owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval,
-					new WatermarkEmittingTask(owner, lockingObject, output));
+				this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval,
+					new WatermarkEmittingTask(this.timeService, lockingObject, output));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 863cf17..961bd9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -83,25 +83,6 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return taskEnvironment.getInputSplitProvider();
 	}
 
-	/**
-	 * Register a timer callback. At the specified time the {@link Triggerable } will be invoked.
-	 * This call is guaranteed to not happen concurrently with method calls on the operator.
-	 *
-	 * @param time The absolute time in milliseconds.
-	 * @param target The target to be triggered.
-	 */
-	public ScheduledFuture<?> registerTimer(long time, Triggerable target) {
-		return operator.registerTimer(time, target);
-	}
-
-	/**
-	 * Returns the current processing time as defined by the task's
-	 * {@link org.apache.flink.streaming.runtime.tasks.TimeServiceProvider TimeServiceProvider}
-	 */
-	public long getCurrentProcessingTime() {
-		return operator.getCurrentProcessingTime();
-	}
-
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 9f487af..7a27cc8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.Serializable;
 
@@ -85,8 +84,7 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
 	public abstract static class WindowAssignerContext {
 
 		/**
-		 * Returns the current processing time, as returned by
-		 * the {@link StreamTask#getCurrentProcessingTime()}.
+		 * Returns the current processing time.
 		 */
 		public abstract long getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 4d6c60f..ff80639 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import java.io.Serializable;
 
@@ -128,8 +127,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	public interface TriggerContext {
 
 		/**
-		 * Returns the current processing time, as returned by
-		 * the {@link StreamTask#getCurrentProcessingTime()}.
+		 * Returns the current processing time.
 		 */
 		long getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index a4815dc..c92ff34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -54,9 +54,9 @@ public class ExtractTimestampsOperator<T>
 		super.open();
 		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
 		if (watermarkInterval > 0) {
-			registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+			long now = getTimerService().getCurrentProcessingTime();
+			getTimerService().registerTimer(now + watermarkInterval, this);
 		}
-
 		currentWatermark = Long.MIN_VALUE;
 	}
 
@@ -74,14 +74,15 @@ public class ExtractTimestampsOperator<T>
 	@Override
 	public void trigger(long timestamp) throws Exception {
 		// register next timer
-		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
 		long newWatermark = userFunction.getCurrentWatermark();
-
 		if (newWatermark > currentWatermark) {
 			currentWatermark = newWatermark;
 			// emit watermark
 			output.emitWatermark(new Watermark(currentWatermark));
 		}
+
+		long now = getTimerService().getCurrentProcessingTime();
+		getTimerService().registerTimer(now + watermarkInterval, this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index 92faed2..f791723 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -54,7 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
 		
 		if (watermarkInterval > 0) {
-			registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+			long now = getTimerService().getCurrentProcessingTime();
+			getTimerService().registerTimer(now + watermarkInterval, this);
 		}
 	}
 
@@ -76,7 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 			output.emitWatermark(newWatermark);
 		}
 
-		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+		long now = getTimerService().getCurrentProcessingTime();
+		getTimerService().registerTimer(now + watermarkInterval, this);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index e74dd87..b39b760 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -125,7 +125,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		
 		// decide when to first compute the window and when to slide it
 		// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
-		final long now = getRuntimeContext().getCurrentProcessingTime();
+		final long now = getTimerService().getCurrentProcessingTime();
 		nextEvaluationTime = now + windowSlide - (now % windowSlide);
 		nextSlideTime = now + paneSize - (now % paneSize);
 
@@ -164,9 +164,9 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 				nextPastTriggerTime = Math.min(nextPastEvaluationTime, nextPastSlideTime);
 			}
 		}
-		
+
 		// make sure the first window happens
-		registerTimer(firstTriggerTime, this);
+		getTimerService().registerTimer(firstTriggerTime, this);
 	}
 
 	@Override
@@ -230,7 +230,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		}
 
 		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		registerTimer(nextTriggerTime, this);
+		getTimerService().registerTimer(nextTriggerTime, this);
 	}
 	
 	private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index dffa2a1..e4939db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -255,7 +255,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
 			@Override
 			public long getCurrentProcessingTime() {
-				return WindowOperator.this.getCurrentProcessingTime();
+				return WindowOperator.this.getTimerService().getCurrentProcessingTime();
 			}
 		};
 
@@ -721,7 +721,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		@Override
 		public long getCurrentProcessingTime() {
-			return WindowOperator.this.getCurrentProcessingTime();
+			return WindowOperator.this.getTimerService().getCurrentProcessingTime();
 		}
 
 		@Override
@@ -732,7 +732,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 				processingTimeTimersQueue.add(timer);
 				//If this is the first timer added for this timestamp register a TriggerTask
 				if (processingTimeTimerTimestamps.add(time, 1) == 0) {
-					ScheduledFuture<?> scheduledFuture = WindowOperator.this.registerTimer(time, WindowOperator.this);
+					ScheduledFuture<?> scheduledFuture = WindowOperator.this.getTimerService()
+						.registerTimer(time, WindowOperator.this);
 					processingTimeTimerFutures.put(time, scheduledFuture);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 80d51a6..faa9672 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.slf4j.Logger;
@@ -65,7 +64,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
@@ -207,16 +205,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		timerService = timeProvider;
 	}
 
-	/**
-	 * Returns the current processing time.
-	 */
-	public long getCurrentProcessingTime() {
-		if (timerService == null) {
-			throw new IllegalStateException("The timer service has not been initialized.");
-		}
-		return timerService.getCurrentProcessingTime();
-	}
-
 	@Override
 	public final void invoke() throws Exception {
 
@@ -825,13 +813,14 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	/**
-	 * Registers a timer.
+	 * Returns the {@link TimeServiceProvider} responsible for telling the current
+	 * processing time and registering timers.
 	 */
-	public ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target) {
+	public TimeServiceProvider getTimerService() {
 		if (timerService == null) {
 			throw new IllegalStateException("The timer service has not been initialized.");
 		}
-		return timerService.registerTimer(timestamp, target);
+		return timerService;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index d61fec9..e8663f5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -45,12 +45,9 @@ import org.mockito.stubbing.Answer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.*;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -241,30 +238,15 @@ public class StreamSourceOperatorTest {
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
-		doAnswer(new Answer<ScheduledFuture>() {
+		doAnswer(new Answer<TimeServiceProvider>() {
 			@Override
-			public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable {
-				final long execTime = (Long) invocation.getArguments()[0];
-				final Triggerable target = (Triggerable) invocation.getArguments()[1];
-
-				if (timeProvider == null) {
-					throw new RuntimeException("The time provider is null");
-				}
-
-				timeProvider.registerTimer(execTime, target);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
-		doAnswer(new Answer<Long>() {
-			@Override
-			public Long answer(InvocationOnMock invocation) throws Throwable {
+			public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
 				if (timeProvider == null) {
-					throw new RuntimeException("The time provider is null");
+					throw new RuntimeException("The time provider is null.");
 				}
-				return timeProvider.getCurrentProcessingTime();
+				return timeProvider;
 			}
-		}).when(mockTask).getCurrentProcessingTime();
+		}).when(mockTask).getTimerService();
 
 		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index b9435f5..98058e8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,7 +64,7 @@ public class StreamTaskTimerTest {
 		testHarness.waitForTaskRunning();
 
 		// first one spawns thread
-		mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() {
+		mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new Triggerable() {
 			@Override
 			public void trigger(long timestamp) {
 			}
@@ -105,10 +106,11 @@ public class StreamTaskTimerTest {
 			final long t3 = System.currentTimeMillis() + 100;
 			final long t4 = System.currentTimeMillis() + 200;
 
-			mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
-			mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
-			mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
-			mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
+			TimeServiceProvider timeService = mapTask.getTimerService();
+			timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
+			timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
+			timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
+			timeService.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
 
 			long deadline = System.currentTimeMillis() + 20000;
 			while (errorRef.get() == null &&

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 4d4f07b..140e9e2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -40,7 +39,6 @@ import java.util.List;
 import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(ResultPartitionWriter.class)
@@ -150,7 +148,7 @@ public class TimeProviderTest {
 			}
 		});
 
-		Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4);
+		Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4);
 
 		provider.setCurrentTime(100);
 		long seen = 0;
@@ -177,24 +175,24 @@ public class TimeProviderTest {
 
 		testHarness.invoke();
 
-		assertTrue(testHarness.getCurrentProcessingTime() == 0);
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
 
 		tp.setCurrentTime(11);
-		assertTrue(testHarness.getCurrentProcessingTime() == 11);
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
 
 		tp.setCurrentTime(15);
 		tp.setCurrentTime(16);
-		assertTrue(testHarness.getCurrentProcessingTime() == 16);
+		assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
 
 		// register 2 tasks
-		mapTask.registerTimer(30, new Triggerable() {
+		mapTask.getTimerService().registerTimer(30, new Triggerable() {
 			@Override
 			public void trigger(long timestamp) {
 
 			}
 		});
 
-		mapTask.registerTimer(40, new Triggerable() {
+		mapTask.getTimerService().registerTimer(40, new Triggerable() {
 			@Override
 			public void trigger(long timestamp) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 9849bd7..f33da89 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,10 +39,11 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -50,28 +51,19 @@ import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Test;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -189,11 +181,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
-	public void testWindowTriggerTimeAlignment() {
+	public void testWindowTriggerTimeAlignment() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTask();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
 
@@ -233,16 +229,21 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
+		finally {
+			timerService.shutdownService();
+		}
 	}
 
 	@Test
-	public void testTumblingWindow() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testTumblingWindow() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -284,17 +285,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
 	public void testSlidingWindow() throws Exception {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 			
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -344,18 +347,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				}
 			}
 		} finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void testTumblingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testTumblingWindowSingleElements() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -400,18 +404,19 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 	
 	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testSlidingWindowSingleElements() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -447,7 +452,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
@@ -494,8 +499,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
 			}
 
+			testHarness.close();
 			op.dispose();
-			
+
 			// re-create the operator and restore the state
 			op = new AccumulatingProcessingTimeWindowOperator<>(
 							validatingIdentityFunction, identitySelector,
@@ -527,6 +533,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, finalResult.get(i).intValue());
 			}
+			testHarness.close();
+			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -580,6 +588,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(new StreamRecord<>(i));
 			}
 
+			testHarness.close();
 			op.dispose();
 
 			// re-create the operator and restore the state
@@ -609,9 +618,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.setCurrentTime(300);
 			timerService.setCurrentTime(350);
 
-			testHarness.close();
-			op.dispose();
-
 			// get and verify the result
 			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
 			List<Integer> finalPartialResult = extractFromStreamRecords(testHarness.getOutput());
@@ -622,6 +628,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			for (int i = 0; i < factor * numElements; i++) {
 				assertEquals(i / factor, finalResult.get(i).intValue());
 			}
+
+			testHarness.close();
+			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -756,31 +765,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
+		final TimeServiceProvider timerService)
 	{
 		StreamTask<?, ?> mockTask = createMockTask();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
+		when(mockTask.getTimerService()).thenReturn(timerService);
 		return mockTask;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 3dfa395..826b230 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,19 +40,17 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.After;
 import org.junit.Test;
 
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -61,19 +59,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -197,11 +189,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	@Test
-	public void testWindowTriggerTimeAlignment() {
+	public void testWindowTriggerTimeAlignment() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTask();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 			
 			AggregatingProcessingTimeWindowOperator<String, String> op;
 
@@ -240,12 +236,17 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
+		} finally {
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void testTumblingWindowUniqueElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testTumblingWindowUniqueElements() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
@@ -255,9 +256,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							sumFunction, fieldOneSelector,
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
-			
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
 			op.open();
@@ -296,20 +296,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdownNow();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void  testTumblingWindowDuplicateElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
+	public void testTumblingWindowDuplicateElements() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -364,18 +364,20 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void testSlidingWindow() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testSlidingWindow() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
 
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -434,18 +436,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdownNow();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testSlidingWindowSingleElements() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -493,17 +496,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
 	@Test
-	public void testPropagateExceptionsFromProcessElement() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+	public void testPropagateExceptionsFromProcessElement() throws Exception {
+		final Object lock = new Object();
+		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+			Executors.newSingleThreadScheduledExecutor(), lock);
+
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
 
 			ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
 
@@ -543,7 +548,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			fail(e.getMessage());
 		}
 		finally {
-			timerService.shutdown();
+			timerService.shutdownService();
 		}
 	}
 
@@ -593,6 +598,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next);
 			}
 
+			testHarness.close();
 			op.dispose();
 
 			// re-create the operator and restore the state
@@ -622,14 +628,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			finalResult.addAll(partialFinalResult);
 			assertEquals(numElements, finalResult.size());
 
-			testHarness.close();
-			op.dispose();
-
 			Collections.sort(finalResult, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
 				assertEquals(i, finalResult.get(i).f0.intValue());
 				assertEquals(i, finalResult.get(i).f1.intValue());
 			}
+
+			testHarness.close();
+			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -685,6 +691,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				testHarness.processElement(next);
 			}
 
+			testHarness.close();
 			op.dispose();
 
 			// re-create the operator and restore the state
@@ -715,9 +722,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.setCurrentTime(350);
 			timerService.setCurrentTime(400);
 
-			testHarness.close();
-			op.dispose();
-
 			// get and verify the result
 			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
 			List<Tuple2<Integer, Integer>> partialFinalResult = extractFromStreamRecords(testHarness.getOutput());
@@ -729,6 +733,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				assertEquals(i / factor, finalResult.get(i).f0.intValue());
 				assertEquals(i / factor, finalResult.get(i).f1.intValue());
 			}
+
+			testHarness.close();
+			op.dispose();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -837,14 +844,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.setCurrentTime(150);
 			timerService.setCurrentTime(200);
 
-			testHarness.close();
-
 			int count1 = StatefulFunction.globalCounts.get(1);
 			int count2 = StatefulFunction.globalCounts.get(2);
 			
 			assertTrue(count1 >= 2 && count1 <= 2 * numElements);
 			assertEquals(count1, count2);
-			
+
+			testHarness.close();
 			op.dispose();
 		}
 		catch (Exception e) {
@@ -941,32 +947,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		return task;
 	}
 
-	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
+	private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService)
 	{
 		StreamTask<?, ?> mockTask = createMockTask();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-		
+		when(mockTask.getTimerService()).thenReturn(timerService);
 		return mockTask;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ce634f0..cbb5a9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> {
 		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
 	}
 
-	public long getCurrentProcessingTime() {
+	public TimeServiceProvider getTimerService() {
 		if (!(task instanceof StreamTask)) {
-			throw new UnsupportedOperationException("getCurrentProcessingTime() only supported on StreamTasks.");
+			throw new UnsupportedOperationException("getTimerService() only supported on StreamTasks.");
 		}
-		return ((StreamTask) task).getCurrentProcessingTime();
+		return ((StreamTask) task).getTimerService();
 	}
 
 	/**
@@ -235,9 +235,6 @@ public class StreamTaskTestHarness<OUT> {
 		}
 		else {
 			if (taskThread.task instanceof StreamTask) {
-				long base = System.currentTimeMillis();
-				long now = 0;
-
 				StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
 				while (!streamTask.isRunning()) {
 					Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 7e86da0..430c6de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 65ed43d..2dd2163 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -21,10 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -34,15 +31,10 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -105,7 +97,7 @@ public class MockContext<IN, OUT> {
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
+		final TimeServiceProvider timerService, final Object lock)
 	{
 		StreamTask<?, ?> task = mock(StreamTask.class);
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
@@ -113,28 +105,7 @@ public class MockContext<IN, OUT> {
 		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
 		when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
 		when(task.getCheckpointLock()).thenReturn(lock);
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(task).registerTimer(anyLong(), any(Triggerable.class));
-
+		when(task.getTimerService()).thenReturn(timerService);
 		return task;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6c637bf..9cdc783 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
@@ -50,7 +49,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -155,23 +153,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		timeServiceProvider = testTimeProvider != null ? testTimeProvider :
 			DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
 
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				final long execTime = (Long) invocation.getArguments()[0];
-				final Triggerable target = (Triggerable) invocation.getArguments()[1];
-
-				timeServiceProvider.registerTimer(execTime, target);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
-		doAnswer(new Answer<Long>() {
+		doAnswer(new Answer<TimeServiceProvider>() {
 			@Override
-			public Long answer(InvocationOnMock invocation) throws Throwable {
-				return timeServiceProvider.getCurrentProcessingTime();
+			public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
+				return timeServiceProvider;
 			}
-		}).when(mockTask).getCurrentProcessingTime();
+		}).when(mockTask).getTimerService();
 	}
 
 	public void setStateBackend(AbstractStateBackend stateBackend) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ffff2997/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 33c8024..707ce0f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
+				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				registerTimer(System.currentTimeMillis() + 1, this);
+				getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}
@@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
+				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			}
 
 			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
+				getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
 				first = false;
 			}
 			numElements++;
@@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			try {
 				numTimers++;
 				throwIfDone();
-				registerTimer(System.currentTimeMillis() + 1, this);
+				getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
 			} finally {
 				semaphore.release();
 			}


[2/3] flink git commit: [FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.

Posted by al...@apache.org.
[FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4779c7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4779c7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4779c7ec

Branch: refs/heads/master
Commit: 4779c7eca0f7e91dd5ee38122baa3fe99c8b7bea
Parents: 568845a
Author: kl0u <kk...@gmail.com>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 23 15:01:06 2016 +0200

----------------------------------------------------------------------
 .../kafka/testutils/MockRuntimeContext.java     |  41 +---
 .../runtime/tasks/AsyncExceptionHandler.java    |  31 +++
 .../tasks/DefaultTimeServiceProvider.java       |  76 ++++++-
 .../streaming/runtime/tasks/StreamTask.java     |  81 +++----
 .../runtime/tasks/TestTimeServiceProvider.java  |  44 ++--
 .../runtime/tasks/TimeServiceProvider.java      |   6 +-
 .../operators/StreamSourceOperatorTest.java     |  14 +-
 .../runtime/operators/StreamTaskTimerTest.java  |  53 -----
 .../runtime/operators/TimeProviderTest.java     | 214 +++++++++++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java |  12 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |  85 ++++----
 .../streaming/util/WindowingTestHarness.java    |   2 -
 14 files changed, 436 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 2d5e2d8..7a50569 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -58,38 +58,28 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	private final int indexOfThisSubtask;
 	
 	private final ExecutionConfig execConfig;
-	private final Object checkpointLock;
 
 	private final TimeServiceProvider timerService;
 
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
+		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object());
 	}
 
 	public MockRuntimeContext(
-		int numberOfParallelSubtasks, int indexOfThisSubtask,
+		int numberOfParallelSubtasks,
+		int indexOfThisSubtask,
 		ExecutionConfig execConfig,
 		Object checkpointLock) {
 
-		this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
-			DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
-	}
-
-	public MockRuntimeContext(
-			int numberOfParallelSubtasks, int indexOfThisSubtask,
-			ExecutionConfig execConfig,
-			Object checkpointLock,
-			TimeServiceProvider timerService) {
-
 		super(new MockStreamOperator(),
-				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-				Collections.<String, Accumulator<?, ?>>emptyMap());
-		
+			new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+			Collections.<String, Accumulator<?, ?>>emptyMap());
+
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.indexOfThisSubtask = indexOfThisSubtask;
 		this.execConfig = execConfig;
-		this.checkpointLock = checkpointLock;
-		this.timerService = timerService;
+		this.timerService = DefaultTimeServiceProvider.
+			createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
 	}
 
 	@Override
@@ -216,20 +206,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	@Override
 	public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
 		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
-		
-		return timerService.registerTimer(time, new Runnable() {
-			@Override
-			public void run() {
-				synchronized (checkpointLock) {
-					try {
-						target.trigger(time);
-					} catch (Throwable t) {
-						System.err.println("!!! Caught exception while processing timer. !!!");
-						t.printStackTrace();
-					}
-				}
-			}
-		});
+		return timerService.registerTimer(time, target);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
new file mode 100644
index 0000000..85a4115
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * An interface marking a task as capable to register exceptions thrown by different
+ * threads, other than the one executing the taks itself.
+ */
+public interface AsyncExceptionHandler {
+
+	/**
+	 * Registers to the main thread an exception that was thrown by another thread
+	 * (e.g. a TriggerTask), other than the one executing the main task.
+	 */
+	void registerAsyncException(String message, AsynchronousException exception);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index b803b82..c7339b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -28,15 +31,26 @@ import java.util.concurrent.TimeUnit;
  */
 public class DefaultTimeServiceProvider extends TimeServiceProvider {
 
+	/** The containing task that owns this time service provider. */
+	private final AsyncExceptionHandler task;
+
+	private final Object checkpointLock;
+
 	/** The executor service that schedules and calls the triggers of this task*/
 	private final ScheduledExecutorService timerService;
 
-	public static DefaultTimeServiceProvider create (ScheduledExecutorService executor) {
-		return new DefaultTimeServiceProvider(executor);
+	public static DefaultTimeServiceProvider create(AsyncExceptionHandler task,
+													ScheduledExecutorService executor,
+													Object checkpointLock) {
+		return new DefaultTimeServiceProvider(task, executor, checkpointLock);
 	}
 
-	private DefaultTimeServiceProvider(ScheduledExecutorService threadPoolExecutor) {
+	private DefaultTimeServiceProvider(AsyncExceptionHandler task,
+									ScheduledExecutorService threadPoolExecutor,
+									Object checkpointLock) {
+		this.task = task;
 		this.timerService = threadPoolExecutor;
+		this.checkpointLock = checkpointLock;
 	}
 
 	@Override
@@ -45,16 +59,62 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	}
 
 	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
 		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
-		return timerService.schedule(target, delay, TimeUnit.MILLISECONDS);
+		return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public boolean isTerminated() {
+		return timerService.isTerminated();
 	}
 
 	@Override
 	public void shutdownService() throws Exception {
-		if (!timerService.isTerminated()) {
-			StreamTask.LOG.info("Timer service is shutting down.");
-		}
 		timerService.shutdownNow();
 	}
+
+	/**
+	 * Internal task that is invoked by the timer service and triggers the target.
+	 */
+	private static final class TriggerTask implements Runnable {
+
+		private final Object lock;
+		private final Triggerable target;
+		private final long timestamp;
+		private final AsyncExceptionHandler task;
+
+		TriggerTask(AsyncExceptionHandler task, final Object lock, Triggerable target, long timestamp) {
+			this.task = task;
+			this.lock = lock;
+			this.target = target;
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public void run() {
+			synchronized (lock) {
+				try {
+					target.trigger(timestamp);
+				} catch (Throwable t) {
+
+					if (task != null) {
+						// registers the exception with the calling task
+						// so that it can be logged and (later) detected
+						TimerException asyncException = new TimerException(t);
+						task.registerAsyncException("Caught exception while processing timer.", asyncException);
+					} else {
+						// this is for when we are in testing mode and we
+						// want to have real processing time.
+						t.printStackTrace();
+					}
+				}
+			}
+		}
+	}
+
+	@VisibleForTesting
+	public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
+		return new DefaultTimeServiceProvider(null, executor, checkpointLock);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 49bbee7..80d51a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,13 +110,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 @Internal
 public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		extends AbstractInvokable
-		implements StatefulTask {
+		implements StatefulTask, AsyncExceptionHandler {
 
 	/** The thread group that holds all trigger timer threads */
 	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
 	
 	/** The logger used by the StreamTask and its subclasses */
-	protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 	
 	// ------------------------------------------------------------------------
 	
@@ -207,7 +207,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		timerService = timeProvider;
 	}
 
+	/**
+	 * Returns the current processing time.
+	 */
 	public long getCurrentProcessingTime() {
+		if (timerService == null) {
+			throw new IllegalStateException("The timer service has not been initialized.");
+		}
 		return timerService.getCurrentProcessingTime();
 	}
 
@@ -237,7 +243,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				// that timestamp are removed by user
 				executor.setRemoveOnCancelPolicy(true);
 
-				timerService = DefaultTimeServiceProvider.create(executor);
+				timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
 			}
 
 			headOperator = configuration.getStreamOperator(getUserCodeClassLoader());
@@ -319,7 +325,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			// stop all timers and threads
 			if (timerService != null) {
 				try {
-					timerService.shutdownService();
+					if (!timerService.isTerminated()) {
+						LOG.info("Timer service is shutting down.");
+						timerService.shutdownService();
+					}
 				}
 				catch (Throwable t) {
 					// catch and log the exception to not replace the original exception
@@ -475,7 +484,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	protected void finalize() throws Throwable {
 		super.finalize();
 		if (timerService != null) {
-			timerService.shutdownService();
+			if (!timerService.isTerminated()) {
+				LOG.info("Timer service is shutting down.");
+				timerService.shutdownService();
+			}
 		}
 
 		closeAllClosables();
@@ -819,7 +831,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		if (timerService == null) {
 			throw new IllegalStateException("The timer service has not been initialized.");
 		}
-		return timerService.registerTimer(timestamp, new TriggerTask(this, lock, target, timestamp));
+		return timerService.registerTimer(timestamp, target);
 	}
 
 	/**
@@ -836,6 +848,17 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
+	@Override
+	public void registerAsyncException(String message, AsynchronousException exception) {
+		if (isRunning) {
+			LOG.error(message, exception);
+		}
+
+		if (this.asyncException == null) {
+			this.asyncException = exception;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -863,42 +886,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	// ------------------------------------------------------------------------
-
-	/**
-	 * Internal task that is invoked by the timer service and triggers the target.
-	 */
-	private static final class TriggerTask implements Runnable {
-
-		private final Object lock;
-		private final Triggerable target;
-		private final long timestamp;
-		private final StreamTask<?, ?> task;
-
-		TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp) {
-			this.task = task;
-			this.lock = lock;
-			this.target = target;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public void run() {
-			synchronized (lock) {
-				try {
-					target.trigger(timestamp);
-				} catch (Throwable t) {
-					if (task.isRunning) {
-						LOG.error("Caught exception while processing timer.", t);
-					}
-					if (task.asyncException == null) {
-						task.asyncException = new TimerException(t);
-					}
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
 	
 	private static class AsyncCheckpointRunnable implements Runnable, Closeable {
 
@@ -961,12 +948,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 				}
 			}
 			catch (Exception e) {
-				if (owner.isRunning()) {
-					LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
-				}
-				if (owner.asyncException == null) {
-					owner.asyncException = new AsynchronousException(e);
-				}
+
+				// registers the exception and tries to fail the whole task
+				AsynchronousException asyncException = new AsynchronousException(e);
+				owner.registerAsyncException("Caught exception while materializing asynchronous checkpoints.", asyncException);
 			}
 			finally {
 				synchronized (cancelables) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index 2314deb..a21a2e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -17,11 +17,13 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -32,30 +34,34 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 
 	private long currentTime = 0;
 
-	private Map<Long, List<Runnable>> registeredTasks = new HashMap<>();
+	private boolean isTerminated = false;
+
+	// sorts the timers by timestamp so that they are processed in the correct order.
+	private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
 
-	public void setCurrentTime(long timestamp) {
+	public void setCurrentTime(long timestamp) throws Exception {
 		this.currentTime = timestamp;
 
 		// decide which timers to fire and put them in a list
 		// we do not fire them here to be able to accommodate timers
-		// that register other timers. The latter would through an exception.
+		// that register other timers.
 
-		Iterator<Map.Entry<Long, List<Runnable>>> it = registeredTasks.entrySet().iterator();
-		List<Runnable> toRun = new ArrayList<>();
+		Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+		List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
 		while (it.hasNext()) {
-			Map.Entry<Long, List<Runnable>> t = it.next();
+			Map.Entry<Long, List<Triggerable>> t = it.next();
 			if (t.getKey() <= this.currentTime) {
-				for (Runnable r: t.getValue()) {
-					toRun.add(r);
-				}
+				toRun.add(t);
 				it.remove();
 			}
 		}
 
 		// now do the actual firing.
-		for (Runnable r: toRun) {
-			r.run();
+		for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+			long now = tasks.getKey();
+			for (Triggerable task: tasks.getValue()) {
+				task.trigger(now);
+			}
 		}
 	}
 
@@ -65,8 +71,8 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 	}
 
 	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
-		List<Runnable> tasks = registeredTasks.get(timestamp);
+	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+		List<Triggerable> tasks = registeredTasks.get(timestamp);
 		if (tasks == null) {
 			tasks = new ArrayList<>();
 			registeredTasks.put(timestamp, tasks);
@@ -75,9 +81,14 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 		return null;
 	}
 
+	@Override
+	public boolean isTerminated() {
+		return isTerminated;
+	}
+
 	public int getNoOfRegisteredTimers() {
 		int count = 0;
-		for (List<Runnable> tasks: registeredTasks.values()) {
+		for (List<Triggerable> tasks: registeredTasks.values()) {
 			count += tasks.size();
 		}
 		return count;
@@ -85,7 +96,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
 
 	@Override
 	public void shutdownService() throws Exception {
-		this.registeredTasks.clear();
-		this.registeredTasks = null;
+		this.isTerminated = true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index f3e4f78..42a4fa4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -34,7 +35,10 @@ public abstract class TimeServiceProvider {
 	 * 						the task to be executed
 	 * @return the result to be returned.
 	 */
-	public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Runnable target);
+	public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target);
+
+	/** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. */
+	public abstract boolean isTerminated();
 
 	/** Shuts down and clean up the timer service provider. */
 	public abstract void shutdownService() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 9c06b49..d61fec9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -184,6 +184,8 @@ public class StreamSourceOperatorTest {
 
 		long watermarkInterval = 10;
 		TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
+		timeProvider.setCurrentTime(0);
+
 		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
 
 		final List<StreamElement> output = new ArrayList<>();
@@ -249,17 +251,7 @@ public class StreamSourceOperatorTest {
 					throw new RuntimeException("The time provider is null");
 				}
 
-				timeProvider.registerTimer(execTime, new Runnable() {
-
-					@Override
-					public void run() {
-						try {
-							target.trigger(execTime);
-						} catch (Exception e) {
-							e.printStackTrace();
-						}
-					}
-				});
+				timeProvider.registerTimer(execTime, target);
 				return null;
 			}
 		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index c9f204d..b9435f5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -49,58 +48,6 @@ import static org.junit.Assert.*;
 public class StreamTaskTimerTest {
 
 	@Test
-	public void testCustomTimeServiceProvider() throws Throwable {
-		TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-		mapTask.setTimeService(tp);
-
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
-			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-
-		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
-		streamConfig.setStreamOperator(mapOperator);
-
-		testHarness.invoke();
-
-		assertTrue(testHarness.getCurrentProcessingTime() == 0);
-
-		tp.setCurrentTime(11);
-		assertTrue(testHarness.getCurrentProcessingTime() == 11);
-
-		tp.setCurrentTime(15);
-		tp.setCurrentTime(16);
-		assertTrue(testHarness.getCurrentProcessingTime() == 16);
-		
-		// register 2 tasks
-		mapTask.registerTimer(30, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		mapTask.registerTimer(40, new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {
-
-			}
-		});
-
-		assertEquals(2, tp.getNoOfRegisteredTimers());
-
-		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNoOfRegisteredTimers());
-
-		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNoOfRegisteredTimers());
-
-		tp.shutdownService();
-	}
-
-	@Test
 	public void testOpenCloseAndTimestamps() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
new file mode 100644
index 0000000..4d4f07b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+	@Test
+	public void testDefaultTimeProvider() throws InterruptedException {
+		final OneShotLatch latch = new OneShotLatch();
+
+		final Object lock = new Object();
+		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+			.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+		final List<Long> timestamps = new ArrayList<>();
+
+		long start = System.currentTimeMillis();
+		long interval = 50L;
+
+		final long noOfTimers = 20;
+
+		// we add 2 timers per iteration minus the first that would have a negative timestamp
+		final long expectedNoOfTimers = 2 * noOfTimers - 1;
+
+		for (int i = 0; i < noOfTimers; i++) {
+			double nextTimer = start + i * interval;
+
+			timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					timestamps.add(timestamp);
+					if (timestamps.size() == expectedNoOfTimers) {
+						latch.trigger();
+					}
+				}
+			});
+
+			// add also out-of-order tasks to verify that eventually
+			// they will be executed in the correct order.
+
+			if (i > 0) {
+				timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() {
+					@Override
+					public void trigger(long timestamp) throws Exception {
+						timestamps.add(timestamp);
+						if (timestamps.size() == expectedNoOfTimers) {
+							latch.trigger();
+						}
+					}
+				});
+			}
+		}
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		Assert.assertEquals(timestamps.size(), expectedNoOfTimers);
+
+		// verify that the tasks are executed
+		// in ascending timestamp order
+
+		int counter = 0;
+		long lastTs = Long.MIN_VALUE;
+		for (long timestamp: timestamps) {
+			Assert.assertTrue(timestamp >= lastTs);
+			lastTs = timestamp;
+
+			long expectedTs = start + (counter/2) * interval;
+			Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40)));
+			counter++;
+		}
+	}
+
+	@Test
+	public void testTimerSorting() throws Exception {
+
+		final List<Long> result = new ArrayList<>();
+
+		TestTimeServiceProvider provider = new TestTimeServiceProvider();
+
+		provider.registerTimer(45, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+				result.add(timestamp);
+			}
+		});
+
+		provider.registerTimer(50, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+				result.add(timestamp);
+			}
+		});
+
+		provider.registerTimer(30, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+				result.add(timestamp);
+			}
+		});
+
+		provider.registerTimer(50, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+				result.add(timestamp);
+			}
+		});
+
+		Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4);
+
+		provider.setCurrentTime(100);
+		long seen = 0;
+		for (Long l: result) {
+			Assert.assertTrue(l >= seen);
+			seen = l;
+		}
+	}
+
+	@Test
+	public void testCustomTimeServiceProvider() throws Throwable {
+		TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+		mapTask.setTimeService(tp);
+
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+			mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+
+		StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
+		streamConfig.setStreamOperator(mapOperator);
+
+		testHarness.invoke();
+
+		assertTrue(testHarness.getCurrentProcessingTime() == 0);
+
+		tp.setCurrentTime(11);
+		assertTrue(testHarness.getCurrentProcessingTime() == 11);
+
+		tp.setCurrentTime(15);
+		tp.setCurrentTime(16);
+		assertTrue(testHarness.getCurrentProcessingTime() == 16);
+
+		// register 2 tasks
+		mapTask.registerTimer(30, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		mapTask.registerTimer(40, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) {
+
+			}
+		});
+
+		assertEquals(2, tp.getNoOfRegisteredTimers());
+
+		tp.setCurrentTime(35);
+		assertEquals(1, tp.getNoOfRegisteredTimers());
+
+		tp.setCurrentTime(40);
+		assertEquals(0, tp.getNoOfRegisteredTimers());
+
+		tp.shutdownService();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 40a6c79..9849bd7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -493,7 +493,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			for (int i = 0; i < 300; i++) {
 				testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
 			}
-			
+
 			op.dispose();
 			
 			// re-create the operator and restore the state
@@ -502,9 +502,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 							windowSize, windowSize);
 
-			testHarness =
-					new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
-
+			timerService = new TestTimeServiceProvider();
+			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
 			testHarness.restore(state);
@@ -580,15 +579,16 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			for (int i = numElementsFirst; i < numElements; i++) {
 				testHarness.processElement(new StreamRecord<>(i));
 			}
-			
+
 			op.dispose();
-			
+
 			// re-create the operator and restore the state
 			op = new AccumulatingProcessingTimeWindowOperator<>(
 					validatingIdentityFunction, identitySelector,
 					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
 					windowSize, windowSlide);
 
+			timerService = new TestTimeServiceProvider();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 59bfe6f..3dfa395 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -601,6 +601,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
+			timerService = new TestTimeServiceProvider();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();
@@ -692,6 +693,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
+			timerService = new TestTimeServiceProvider();
 			testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
 			testHarness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index cb10c5c..ce634f0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -115,7 +115,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	public long getCurrentProcessingTime() {
 		if (!(task instanceof StreamTask)) {
-			System.currentTimeMillis();
+			throw new UnsupportedOperationException("getCurrentProcessingTime() only supported on StreamTasks.");
 		}
 		return ((StreamTask) task).getCurrentProcessingTime();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 15074a7..6c637bf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -37,8 +37,10 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -87,7 +89,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	 */
 	private boolean setupCalled = false;
 
-
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
 		this(operator, new ExecutionConfig());
 	}
@@ -95,27 +96,35 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig) {
-		this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
+		this(operator, executionConfig, null);
+	}
+
+	public OneInputStreamOperatorTestHarness(
+			OneInputStreamOperator<IN, OUT> operator,
+			ExecutionConfig executionConfig,
+			TimeServiceProvider testTimeProvider) {
+		this(operator, executionConfig, new Object(), testTimeProvider);
 	}
 
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
+			Object checkpointLock,
 			TimeServiceProvider testTimeProvider) {
+
 		this.operator = operator;
 		this.outputList = new ConcurrentLinkedQueue<Object>();
 		Configuration underlyingConfig = new Configuration();
 		this.config = new StreamConfig(underlyingConfig);
 		this.config.setCheckpointingEnabled(true);
 		this.executionConfig = executionConfig;
-		this.checkpointLock = new Object();
+		this.checkpointLock = checkpointLock;
 
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
 		mockTask = mock(StreamTask.class);
-		timeServiceProvider = testTimeProvider;
 
 		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+		when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(env);
@@ -125,21 +134,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
-				final long execTime = (Long) invocation.getArguments()[0];
-				final Triggerable target = (Triggerable) invocation.getArguments()[1];
-
-				timeServiceProvider.registerTimer(
-						execTime, new TriggerTask(checkpointLock, target, execTime));
+				// do nothing
 				return null;
 			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
-		doAnswer(new Answer<Long>() {
-			@Override
-			public Long answer(InvocationOnMock invocation) throws Throwable {
-				return timeServiceProvider.getCurrentProcessingTime();
-			}
-		}).when(mockTask).getCurrentProcessingTime();
+		}).when(mockTask).registerAsyncException(any(String.class), any(AsynchronousException.class));
 
 		try {
 			doAnswer(new Answer<CheckpointStreamFactory>() {
@@ -154,6 +152,26 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			throw new RuntimeException(e.getMessage(), e);
 		}
 
+		timeServiceProvider = testTimeProvider != null ? testTimeProvider :
+			DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
+
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				final long execTime = (Long) invocation.getArguments()[0];
+				final Triggerable target = (Triggerable) invocation.getArguments()[1];
+
+				timeServiceProvider.registerTimer(execTime, target);
+				return null;
+			}
+		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+		doAnswer(new Answer<Long>() {
+			@Override
+			public Long answer(InvocationOnMock invocation) throws Throwable {
+				return timeServiceProvider.getCurrentProcessingTime();
+			}
+		}).when(mockTask).getCurrentProcessingTime();
 	}
 
 	public void setStateBackend(AbstractStateBackend stateBackend) {
@@ -216,7 +234,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		operator.notifyOfCompletedCheckpoint(checkpointId);
 	}
 
-
 	/**
 	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)} ()}
 	 */
@@ -275,32 +292,4 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 			// ignore
 		}
 	}
-
-	private static final class TriggerTask implements Runnable {
-
-		private final Object lock;
-		private final Triggerable target;
-		private final long timestamp;
-
-		TriggerTask(final Object lock, Triggerable target, long timestamp) {
-			this.lock = lock;
-			this.target = target;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public void run() {
-			synchronized (lock) {
-				try {
-					target.trigger(timestamp);
-				} catch (Throwable t) {
-					try {
-						throw t;
-					} catch (Exception e) {
-						e.printStackTrace();
-					}
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index af1f3fa..d47136c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;


[3/3] flink git commit: [hotfix] Replace registerTimer/getTime by TimeServiceProvider in Context

Posted by al...@apache.org.
[hotfix] Replace registerTimer/getTime by TimeServiceProvider in Context


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51a5048b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51a5048b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51a5048b

Branch: refs/heads/master
Commit: 51a5048b24ffe7655e2197c04aa844239bf1af83
Parents: ffff299
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Sep 23 10:40:16 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Sep 23 15:01:07 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  | 23 +++++++------
 .../kafka/internals/AbstractFetcher.java        | 19 ++++++-----
 .../kafka/testutils/MockRuntimeContext.java     | 11 ++----
 .../api/operators/StreamingRuntimeContext.java  |  7 ++--
 .../runtime/tasks/AsyncExceptionHandler.java    |  8 ++---
 .../tasks/DefaultTimeServiceProvider.java       | 35 +++++++++-----------
 .../streaming/runtime/tasks/StreamTask.java     |  6 ++--
 .../runtime/operators/TimeProviderTest.java     | 13 ++++++--
 .../util/OneInputStreamOperatorTestHarness.java |  2 +-
 9 files changed, 64 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 1e05c0d..5a5cade 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -284,6 +285,8 @@ public class BucketingSink<T>
 
 	private transient Clock clock;
 
+	private transient TimeServiceProvider processingTimeService;
+
 	/**
 	 * Creates a new {@code BucketingSink} that writes files to the given base directory.
 	 *
@@ -320,18 +323,19 @@ public class BucketingSink<T>
 		FileSystem fs = baseDirectory.getFileSystem(hadoopConf);
 		refTruncate = reflectTruncate(fs);
 
-		long currentProcessingTime =
-				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+		processingTimeService =
+				((StreamingRuntimeContext) getRuntimeContext()).getTimeServiceProvider();
+
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
 		checkForInactiveBuckets(currentProcessingTime);
 
-		((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
-				currentProcessingTime + inactiveBucketCheckInterval, this);
+		processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
 
 		this.clock = new Clock() {
 			@Override
 			public long currentTimeMillis() {
-				return ((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+				return processingTimeService.getCurrentProcessingTime();
 			}
 		};
 
@@ -376,8 +380,7 @@ public class BucketingSink<T>
 	public void invoke(T value) throws Exception {
 		Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value);
 
-		long currentProcessingTime =
-				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
 		if (!state.hasBucketState(bucketPath)) {
 			state.addBucketState(bucketPath, new BucketState<T>(currentProcessingTime));
@@ -420,13 +423,11 @@ public class BucketingSink<T>
 
 	@Override
 	public void trigger(long timestamp) throws Exception {
-		long currentProcessingTime =
-				((StreamingRuntimeContext) getRuntimeContext()).getCurrentProcessingTime();
+		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
 		checkForInactiveBuckets(currentProcessingTime);
 
-		((StreamingRuntimeContext) getRuntimeContext()).registerTimer(
-				currentProcessingTime + inactiveBucketCheckInterval, this);
+		processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 8ec26cc..9255445 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -80,7 +81,8 @@ public abstract class AbstractFetcher<T, KPH> {
 			List<KafkaTopicPartition> assignedPartitions,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext, boolean useMetrics) throws Exception
+			StreamingRuntimeContext runtimeContext,
+			boolean useMetrics) throws Exception
 	{
 		this.sourceContext = checkNotNull(sourceContext);
 		this.checkpointLock = sourceContext.getCheckpointLock();
@@ -116,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> {
 					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
 			
 			PeriodicWatermarkEmitter periodicEmitter = 
-					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext);
+					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getTimeServiceProvider(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
 			periodicEmitter.start();
 		}
 	}
@@ -458,7 +460,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		
 		private final SourceContext<?> emitter;
 		
-		private final StreamingRuntimeContext triggerContext;
+		private final TimeServiceProvider timerService;
 
 		private final long interval;
 		
@@ -469,19 +471,20 @@ public abstract class AbstractFetcher<T, KPH> {
 		PeriodicWatermarkEmitter(
 				KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
 				SourceContext<?> emitter,
-				StreamingRuntimeContext runtimeContext)
+				TimeServiceProvider timerService,
+				long autoWatermarkInterval)
 		{
 			this.allPartitions = checkNotNull(allPartitions);
 			this.emitter = checkNotNull(emitter);
-			this.triggerContext = checkNotNull(runtimeContext);
-			this.interval = runtimeContext.getExecutionConfig().getAutoWatermarkInterval();
+			this.timerService = checkNotNull(timerService);
+			this.interval = autoWatermarkInterval;
 			this.lastWatermarkTimestamp = Long.MIN_VALUE;
 		}
 
 		//-------------------------------------------------
 		
 		public void start() {
-			triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}
 		
 		@Override
@@ -510,7 +513,7 @@ public abstract class AbstractFetcher<T, KPH> {
 			}
 			
 			// schedule the next watermark
-			triggerContext.registerTimer(triggerContext.getCurrentProcessingTime() + interval, this);
+			timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 7a50569..da2c652 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -198,15 +198,8 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 	}
 
 	@Override
-	public long getCurrentProcessingTime() {
-		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
-		return timerService.getCurrentProcessingTime();
-	}
-
-	@Override
-	public ScheduledFuture<?> registerTimer(final long time, final Triggerable target) {
-		Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
-		return timerService.registerTimer(time, target);
+	public TimeServiceProvider getTimeServiceProvider() {
+		return timerService;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index 961bd9d..4f85e3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -35,11 +35,10 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
 
 import static java.util.Objects.requireNonNull;
 
@@ -83,6 +82,10 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 		return taskEnvironment.getInputSplitProvider();
 	}
 
+	public TimeServiceProvider getTimeServiceProvider() {
+		return operator.getTimerService();
+	}
+
 	// ------------------------------------------------------------------------
 	//  broadcast variables
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index 85a4115..c7ec2ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -18,14 +18,12 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * An interface marking a task as capable to register exceptions thrown by different
- * threads, other than the one executing the taks itself.
+ * Interface for reporting exceptions that are thrown in (possibly) a different thread.
  */
 public interface AsyncExceptionHandler {
 
 	/**
-	 * Registers to the main thread an exception that was thrown by another thread
-	 * (e.g. a TriggerTask), other than the one executing the main task.
+	 * Registers the given exception.
 	 */
-	void registerAsyncException(String message, AsynchronousException exception);
+	void registerAsyncException(AsynchronousException exception);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index c7339b3..ea2b07f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -39,10 +39,11 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	/** The executor service that schedules and calls the triggers of this task*/
 	private final ScheduledExecutorService timerService;
 
-	public static DefaultTimeServiceProvider create(AsyncExceptionHandler task,
-													ScheduledExecutorService executor,
-													Object checkpointLock) {
-		return new DefaultTimeServiceProvider(task, executor, checkpointLock);
+	public static DefaultTimeServiceProvider create(
+			AsyncExceptionHandler exceptionHandler,
+			ScheduledExecutorService executor,
+			Object checkpointLock) {
+		return new DefaultTimeServiceProvider(exceptionHandler, executor, checkpointLock);
 	}
 
 	private DefaultTimeServiceProvider(AsyncExceptionHandler task,
@@ -82,10 +83,10 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 		private final Object lock;
 		private final Triggerable target;
 		private final long timestamp;
-		private final AsyncExceptionHandler task;
+		private final AsyncExceptionHandler exceptionHandler;
 
-		TriggerTask(AsyncExceptionHandler task, final Object lock, Triggerable target, long timestamp) {
-			this.task = task;
+		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) {
+			this.exceptionHandler = exceptionHandler;
 			this.lock = lock;
 			this.target = target;
 			this.timestamp = timestamp;
@@ -97,17 +98,8 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 				try {
 					target.trigger(timestamp);
 				} catch (Throwable t) {
-
-					if (task != null) {
-						// registers the exception with the calling task
-						// so that it can be logged and (later) detected
-						TimerException asyncException = new TimerException(t);
-						task.registerAsyncException("Caught exception while processing timer.", asyncException);
-					} else {
-						// this is for when we are in testing mode and we
-						// want to have real processing time.
-						t.printStackTrace();
-					}
+					TimerException asyncException = new TimerException(t);
+					exceptionHandler.registerAsyncException(asyncException);
 				}
 			}
 		}
@@ -115,6 +107,11 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 
 	@VisibleForTesting
 	public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
-		return new DefaultTimeServiceProvider(null, executor, checkpointLock);
+		return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
+			@Override
+			public void registerAsyncException(AsynchronousException exception) {
+				exception.printStackTrace();
+			}
+		}, executor, checkpointLock);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index faa9672..ff074b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -838,9 +838,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	}
 
 	@Override
-	public void registerAsyncException(String message, AsynchronousException exception) {
+	public void registerAsyncException(AsynchronousException exception) {
 		if (isRunning) {
-			LOG.error(message, exception);
+			LOG.error("Asynchronous exception registered.", exception);
 		}
 
 		if (this.asyncException == null) {
@@ -940,7 +940,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(e);
-				owner.registerAsyncException("Caught exception while materializing asynchronous checkpoints.", asyncException);
+				owner.registerAsyncException(asyncException);
 			}
 			finally {
 				synchronized (cancelables) {

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 140e9e2..60850d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
@@ -50,8 +52,15 @@ public class TimeProviderTest {
 		final OneShotLatch latch = new OneShotLatch();
 
 		final Object lock = new Object();
-		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
-			.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider.create(
+				new AsyncExceptionHandler() {
+					@Override
+					public void registerAsyncException(AsynchronousException exception) {
+						exception.printStackTrace();
+					}
+				},
+				Executors.newSingleThreadScheduledExecutor(),
+				lock);
 
 		final List<Long> timestamps = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51a5048b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9cdc783..acf046a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -135,7 +135,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 				// do nothing
 				return null;
 			}
-		}).when(mockTask).registerAsyncException(any(String.class), any(AsynchronousException.class));
+		}).when(mockTask).registerAsyncException(any(AsynchronousException.class));
 
 		try {
 			doAnswer(new Answer<CheckpointStreamFactory>() {