You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/20 16:49:15 UTC

[10/10] flink git commit: [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService

[FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService

The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule
latency mark emission. For that the ProcessingTimeService was extended to have the method
scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated
task.

This closes #3008.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab2125b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab2125b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab2125b8

Branch: refs/heads/master
Commit: ab2125b82ed10389dafecf1712efc1f8fb977c11
Parents: a26b0f0
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 14 14:53:11 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 20 17:03:01 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/StreamSource.java   |  48 +++--
 .../runtime/tasks/ProcessingTimeService.java    |  10 +
 .../tasks/SystemProcessingTimeService.java      |  68 +++++++
 .../tasks/TestProcessingTimeService.java        | 156 +++++++++------
 .../operators/HeapInternalTimerServiceTest.java |  36 ++--
 .../operators/StreamSourceOperatorTest.java     |  85 ++++++---
 .../TestProcessingTimeServiceTest.java          |   6 +-
 .../tasks/SystemProcessingTimeServiceTest.java  | 188 ++++++++++++++++++-
 8 files changed, 474 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 5a16db0..84330b6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -23,11 +23,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 /**
  * {@link StreamOperator} for streaming sources.
@@ -62,8 +61,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		LatencyMarksEmitter latencyEmitter = null;
 		if(getExecutionConfig().isLatencyTrackingEnabled()) {
-			latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(),
-					getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask());
+			latencyEmitter = new LatencyMarksEmitter<>(
+				getProcessingTimeService(),
+				collector,
+				getExecutionConfig().getLatencyTrackingInterval(),
+				getOperatorConfig().getVertexID(),
+				getRuntimeContext().getIndexOfThisSubtask());
 		}
 		
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
@@ -121,28 +124,35 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	}
 
 	private static class LatencyMarksEmitter<OUT> {
-		private final ScheduledExecutorService scheduleExecutor;
 		private final ScheduledFuture<?> latencyMarkTimer;
 
-		public LatencyMarksEmitter(final Object lockingObject, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) {
-			this.scheduleExecutor = Executors.newScheduledThreadPool(1);
-			this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
-				@Override
-				public void run() {
-					try {
-						synchronized (lockingObject) {
-							output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex));
+		public LatencyMarksEmitter(
+				final ProcessingTimeService processingTimeService,
+				final Output<StreamRecord<OUT>> output,
+				long latencyTrackingInterval,
+				final int vertexID,
+				final int subtaskIndex) {
+
+			latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) throws Exception {
+						try {
+							// ProcessingTimeService callbacks are executed under the checkpointing lock
+							output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
+						} catch (Throwable t) {
+							// we catch the Throwables here so that we don't trigger the processing
+							// timer services async exception handler
+							LOG.warn("Error while emitting latency marker.", t);
 						}
-					} catch (Throwable t) {
-						LOG.warn("Error while emitting latency marker", t);
 					}
-				}
-			}, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS);
+				},
+				0L,
+				latencyTrackingInterval);
 		}
 
 		public void close() {
 			latencyMarkTimer.cancel(true);
-			scheduleExecutor.shutdownNow();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index f64bead..240aba8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -56,6 +56,16 @@ public abstract class ProcessingTimeService {
 	public abstract ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target);
 
 	/**
+	 * Registers a task to be executed repeatedly at a fixed rate.
+	 *
+	 * @param callback to be executed after the initial delay and then after each period
+	 * @param initialDelay initial delay to start executing callback
+	 * @param period after the initial delay after which the callback is executed
+	 * @return Scheduled future representing the task to be executed repeatedly
+	 */
+	public abstract ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period);
+
+	/**
 	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
 	 */
 	public abstract boolean isTerminated();

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 071dbce..abcb19b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import java.util.concurrent.BlockingQueue;
@@ -124,6 +125,33 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
+		long nextTimestamp = getCurrentProcessingTime() + initialDelay;
+
+		// we directly try to register the timer and only react to the status on exception
+		// that way we save unnecessary volatile accesses for each timer
+		try {
+			return timerService.scheduleAtFixedRate(
+				new RepeatedTriggerTask(task, checkpointLock, callback, nextTimestamp, period),
+				initialDelay,
+				period,
+				TimeUnit.MILLISECONDS);
+		} catch (RejectedExecutionException e) {
+			final int status = this.status.get();
+			if (status == STATUS_QUIESCED) {
+				return new NeverCompleteFuture(initialDelay);
+			}
+			else if (status == STATUS_SHUTDOWN) {
+				throw new IllegalStateException("Timer service is shut down");
+			}
+			else {
+				// something else happened, so propagate the exception
+				throw e;
+			}
+		}
+	}
+
+	@Override
 	public boolean isTerminated() {
 		return status.get() == STATUS_SHUTDOWN;
 	}
@@ -196,6 +224,46 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		}
 	}
 
+	/**
+	 * Internal task which is repeatedly called by the processing time service.
+	 */
+	private static final class RepeatedTriggerTask implements Runnable {
+		private final Object lock;
+		private final ProcessingTimeCallback target;
+		private final long period;
+		private final AsyncExceptionHandler exceptionHandler;
+
+		private long nextTimestamp;
+
+		private RepeatedTriggerTask(
+				AsyncExceptionHandler exceptionHandler,
+				Object lock,
+				ProcessingTimeCallback target,
+				long nextTimestamp,
+				long period) {
+			this.lock = Preconditions.checkNotNull(lock);
+			this.target = Preconditions.checkNotNull(target);
+			this.period = period;
+			this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);
+
+			this.nextTimestamp = nextTimestamp;
+		}
+
+		@Override
+		public void run() {
+			try {
+				synchronized (lock) {
+					target.onProcessingTime(nextTimestamp);
+				}
+
+				nextTimestamp += period;
+			} catch (Throwable t) {
+				TimerException asyncException = new TimerException(t);
+				exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class NeverCompleteFuture implements ScheduledFuture<Object> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2ca287a..3c33ad3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -17,18 +17,19 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.ArrayList;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the
@@ -36,38 +37,38 @@ import java.util.concurrent.TimeoutException;
  * */
 public class TestProcessingTimeService extends ProcessingTimeService {
 
-	private volatile long currentTime = 0;
+	private volatile long currentTime = 0L;
 
 	private volatile boolean isTerminated;
 	private volatile boolean isQuiesced;
 
 	// sorts the timers by timestamp so that they are processed in the correct order.
-	private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = new TreeMap<>();
+	private final PriorityQueue<Tuple2<Long, CallbackTask>> priorityQueue;
 
+	public TestProcessingTimeService() {
+		this.priorityQueue = new PriorityQueue<>(16, new Comparator<Tuple2<Long, CallbackTask>>() {
+			@Override
+			public int compare(Tuple2<Long, CallbackTask> o1, Tuple2<Long, CallbackTask> o2) {
+				return Long.compare(o1.f0, o2.f0);
+			}
+		});
+	}
 	
 	public void setCurrentTime(long timestamp) throws Exception {
 		this.currentTime = timestamp;
 
 		if (!isQuiesced) {
-			// decide which timers to fire and put them in a list
-			// we do not fire them here to be able to accommodate timers
-			// that register other timers.
-	
-			Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> it = registeredTasks.entrySet().iterator();
-			List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun = new ArrayList<>();
-			while (it.hasNext()) {
-				Map.Entry<Long, List<ScheduledTimerFuture>> t = it.next();
-				if (t.getKey() <= this.currentTime) {
-					toRun.add(t);
-					it.remove();
-				}
-			}
-	
-			// now do the actual firing.
-			for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
-				long now = tasks.getKey();
-				for (ScheduledTimerFuture task: tasks.getValue()) {
-					task.getProcessingTimeCallback().onProcessingTime(now);
+			while (!priorityQueue.isEmpty() && currentTime >= priorityQueue.peek().f0) {
+				Tuple2<Long, CallbackTask> entry = priorityQueue.poll();
+
+				CallbackTask callbackTask = entry.f1;
+
+				if (!callbackTask.isDone()) {
+					callbackTask.onProcessingTime(entry.f0);
+
+					if (callbackTask instanceof PeriodicCallbackTask) {
+						priorityQueue.offer(Tuple2.of(((PeriodicCallbackTask)callbackTask).nextTimestamp(entry.f0), callbackTask));
+					}
 				}
 			}
 		}
@@ -84,27 +85,38 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new IllegalStateException("terminated");
 		}
 		if (isQuiesced) {
-			return new ScheduledTimerFuture(null, -1);
+			return new CallbackTask(null);
 		}
 
+		CallbackTask callbackTask = new CallbackTask(target);
+
 		if (timestamp <= currentTime) {
 			try {
-				target.onProcessingTime(timestamp);
+				callbackTask.onProcessingTime(timestamp);
 			} catch (Exception e) {
 				throw new RuntimeException(e);
 			}
+		} else {
+			priorityQueue.offer(Tuple2.of(timestamp, callbackTask));
 		}
 
-		ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp);
+		return callbackTask;
+	}
 
-		List<ScheduledTimerFuture> tasks = registeredTasks.get(timestamp);
-		if (tasks == null) {
-			tasks = new ArrayList<>();
-			registeredTasks.put(timestamp, tasks);
+	@Override
+	public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
+		if (isTerminated) {
+			throw new IllegalStateException("terminated");
 		}
-		tasks.add(result);
+		if (isQuiesced) {
+			return new CallbackTask(null);
+		}
+
+		PeriodicCallbackTask periodicCallbackTask = new PeriodicCallbackTask(callback, period);
 
-		return result;
+		priorityQueue.offer(Tuple2.<Long, CallbackTask>of(currentTime + initialDelay, periodicCallbackTask));
+
+		return periodicCallbackTask;
 	}
 
 	@Override
@@ -116,7 +128,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	public void quiesceAndAwaitPending() {
 		if (!isTerminated) {
 			isQuiesced = true;
-			registeredTasks.clear();
+			priorityQueue.clear();
 		}
 	}
 
@@ -125,35 +137,46 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 		this.isTerminated = true;
 	}
 
-	public int getNumRegisteredTimers() {
+	public int getNumActiveTimers() {
 		int count = 0;
-		for (List<ScheduledTimerFuture> tasks: registeredTasks.values()) {
-			count += tasks.size();
+
+		for (Tuple2<Long, CallbackTask> entry : priorityQueue) {
+			if (!entry.f1.isDone()) {
+				count++;
+			}
 		}
+
 		return count;
 	}
 
-	public Set<Long> getRegisteredTimerTimestamps() {
+	public Set<Long> getActiveTimerTimestamps() {
 		Set<Long> actualTimestamps = new HashSet<>();
-		for (List<ScheduledTimerFuture> timerFutures : registeredTasks.values()) {
-			for (ScheduledTimerFuture timer : timerFutures) {
-				actualTimestamps.add(timer.getTimestamp());
+
+		for (Tuple2<Long, CallbackTask> entry : priorityQueue) {
+			if (!entry.f1.isDone()) {
+				actualTimestamps.add(entry.f0);
 			}
 		}
+
 		return actualTimestamps;
 	}
 
 	// ------------------------------------------------------------------------
 
-	private class ScheduledTimerFuture implements ScheduledFuture<Object> {
+	private static class CallbackTask implements ScheduledFuture<Object> {
 
-		private final ProcessingTimeCallback processingTimeCallback;
+		protected final ProcessingTimeCallback processingTimeCallback;
 
-		private final long timestamp;
+		private AtomicReference<CallbackTaskState> state = new AtomicReference<>(CallbackTaskState.CREATED);
 
-		public ScheduledTimerFuture(ProcessingTimeCallback processingTimeCallback, long timestamp) {
+		private CallbackTask(ProcessingTimeCallback processingTimeCallback) {
 			this.processingTimeCallback = processingTimeCallback;
-			this.timestamp = timestamp;
+		}
+
+		public void onProcessingTime(long timestamp) throws Exception {
+			processingTimeCallback.onProcessingTime(timestamp);
+
+			state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.DONE);
 		}
 
 		@Override
@@ -168,21 +191,17 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 		@Override
 		public boolean cancel(boolean mayInterruptIfRunning) {
-			List<ScheduledTimerFuture> scheduledTimerFutures = registeredTasks.get(timestamp);
-			if (scheduledTimerFutures != null) {
-				scheduledTimerFutures.remove(this);
-			}
-			return true;
+			return state.compareAndSet(CallbackTaskState.CREATED, CallbackTaskState.CANCELLED);
 		}
 
 		@Override
 		public boolean isCancelled() {
-			throw new UnsupportedOperationException();
+			return state.get() == CallbackTaskState.CANCELLED;
 		}
 
 		@Override
 		public boolean isDone() {
-			throw new UnsupportedOperationException();
+			return state.get() != CallbackTaskState.CREATED;
 		}
 
 		@Override
@@ -195,12 +214,31 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new UnsupportedOperationException();
 		}
 
-		public ProcessingTimeCallback getProcessingTimeCallback() {
-			return processingTimeCallback;
+		enum CallbackTaskState {
+			CREATED,
+			CANCELLED,
+			DONE
+		}
+	}
+
+	private static class PeriodicCallbackTask extends CallbackTask {
+
+		private final long period;
+
+		private PeriodicCallbackTask(ProcessingTimeCallback processingTimeCallback, long period) {
+			super(processingTimeCallback);
+			Preconditions.checkArgument(period > 0L, "The period must be greater than 0.");
+
+			this.period = period;
+		}
+
+		@Override
+		public void onProcessingTime(long timestamp) throws Exception {
+			processingTimeCallback.onProcessingTime(timestamp);
 		}
 
-		public long getTimestamp() {
-			return timestamp;
+		public long nextTimestamp(long currentTimestamp) {
+			return currentTimestamp + period;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index d753e4e..680f2ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -176,8 +176,8 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(2, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 
 		processingTimeService.setCurrentTime(10);
 
@@ -185,8 +185,8 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(1, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		processingTimeService.setCurrentTime(20);
 
@@ -194,18 +194,18 @@ public class HeapInternalTimerServiceTest {
 		assertEquals(0, timerService.numProcessingTimeTimers("hello"));
 		assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L));
 
 		processingTimeService.setCurrentTime(30);
 
 		assertEquals(0, timerService.numProcessingTimeTimers());
 
-		assertEquals(0, processingTimeService.getNumRegisteredTimers());
+		assertEquals(0, processingTimeService.getNumActiveTimers());
 
 		timerService.registerProcessingTimeTimer("ciao", 40);
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
+		assertEquals(1, processingTimeService.getNumActiveTimers());
 	}
 
 	/**
@@ -233,15 +233,15 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		timerService.registerProcessingTimeTimer("ciao", 10);
 
 		assertEquals(2, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 	}
 
 	/**
@@ -266,8 +266,8 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(10L));
 
 		doAnswer(new Answer<Object>() {
 			@Override
@@ -279,8 +279,8 @@ public class HeapInternalTimerServiceTest {
 
 		processingTimeService.setCurrentTime(10);
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(20L));
 
 		doAnswer(new Answer<Object>() {
 			@Override
@@ -294,8 +294,8 @@ public class HeapInternalTimerServiceTest {
 
 		assertEquals(1, timerService.numProcessingTimeTimers());
 
-		assertEquals(1, processingTimeService.getNumRegisteredTimers());
-		assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+		assertEquals(1, processingTimeService.getNumActiveTimers());
+		assertThat(processingTimeService.getActiveTimerTimestamps(), containsInAnyOrder(30L));
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index e600420..b153de9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -47,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -181,42 +181,52 @@ public class StreamSourceOperatorTest {
 	 */
 	@Test
 	public void testLatencyMarkEmission() throws Exception {
-		final long now = System.currentTimeMillis();
-
 		final List<StreamElement> output = new ArrayList<>();
 
+		final long maxProcessingTime = 100L;
+		final long latencyMarkInterval = 10L;
+
+		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
+		testProcessingTimeService.setCurrentTime(0L);
+		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
+
 		// regular stream source operator
-		final StoppableStreamSource<String, InfiniteSource<String>> operator =
-				new StoppableStreamSource<>(new InfiniteSource<String>());
+		final StreamSource<Long, ProcessingTimeServiceSource> operator =
+				new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
 
 		// emit latency marks every 10 milliseconds.
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10);
-
-		// trigger an async cancel in a bit
-		new Thread("canceler") {
-			@Override
-			public void run() {
-				try {
-					Thread.sleep(200);
-				} catch (InterruptedException ignored) {}
-				operator.stop();
-			}
-		}.start();
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
 
 		// run and wait to be stopped
-		operator.run(new Object(), new CollectorOutput<String>(output));
+		operator.run(new Object(), new CollectorOutput<Long>(output));
+
+		int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
+
+		assertEquals(
+			numberLatencyMarkers + 1, // + 1 is the final watermark element
+			output.size());
 
-		// ensure that there has been some output
-		assertTrue(output.size() > 0);
-		// and that its only latency markers
-		for(StreamElement se: output) {
+		long timestamp = 0L;
+
+		int i = 0;
+		// and that its only latency markers + a final watermark
+		for (; i < output.size() - 1; i++) {
+			StreamElement se = output.get(i);
 			Assert.assertTrue(se.isLatencyMarker());
 			Assert.assertEquals(-1, se.asLatencyMarker().getVertexID());
 			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
-			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now);
+			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
+
+			timestamp += latencyMarkInterval;
 		}
+
+		Assert.assertTrue(output.get(i).isWatermark());
 	}
 
+	@Test
+	public void testLatencyMarksEmitterLifecycleIntegration() {
+
+	}
 
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
@@ -341,4 +351,33 @@ public class StreamSourceOperatorTest {
 			running = false;
 		}
 	}
+
+	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
+
+		private final TestProcessingTimeService processingTimeService;
+		private final List<Long> processingTimes;
+
+		private boolean cancelled = false;
+
+		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
+			this.processingTimeService = processingTimeService;
+			this.processingTimes = processingTimes;
+		}
+
+		@Override
+		public void run(SourceContext<Long> ctx) throws Exception {
+			for (Long processingTime : processingTimes) {
+				if (cancelled) {
+					break;
+				}
+
+				processingTimeService.setCurrentTime(processingTime);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index cd1f253..2903758 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -76,13 +76,13 @@ public class TestProcessingTimeServiceTest {
 			}
 		});
 
-		assertEquals(2, tp.getNumRegisteredTimers());
+		assertEquals(2, tp.getNumActiveTimers());
 
 		tp.setCurrentTime(35);
-		assertEquals(1, tp.getNumRegisteredTimers());
+		assertEquals(1, tp.getNumActiveTimers());
 
 		tp.setCurrentTime(40);
-		assertEquals(0, tp.getNumRegisteredTimers());
+		assertEquals(0, tp.getNumActiveTimers());
 
 		tp.shutdownService();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2125b8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 766b313..50e438c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -34,7 +36,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class SystemProcessingTimeServiceTest {
+public class SystemProcessingTimeServiceTest extends TestLogger {
 
 	@Test
 	public void testTriggerHoldsLock() throws Exception {
@@ -70,6 +72,134 @@ public class SystemProcessingTimeServiceTest {
 		}
 	}
 
+	/**
+	 * Tests that the schedule at fixed rate callback is called under the given lock
+	 */
+	@Test
+	public void testScheduleAtFixedRateHoldsLock() throws Exception {
+
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		final OneShotLatch awaitCallback = new OneShotLatch();
+
+		try {
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			// schedule something
+			ScheduledFuture<?> future = timer.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) {
+						assertTrue(Thread.holdsLock(lock));
+
+						awaitCallback.trigger();
+					}
+				},
+				0L,
+				100L);
+
+			// wait until the first execution is active
+			awaitCallback.await();
+
+			// cancel periodic callback
+			future.cancel(true);
+
+			// check that no asynchronous error was reported
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+		}
+		finally {
+			timer.shutdownService();
+		}
+	}
+
+	/**
+	 * Tests that SystemProcessingTimeService#scheduleAtFixedRate is actually triggered multiple
+	 * times.
+	 */
+	@Test(timeout=10000)
+	public void testScheduleAtFixedRate() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final long period = 10L;
+		final int countDown = 3;
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		final CountDownLatch countDownLatch = new CountDownLatch(countDown);
+
+		try {
+			timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+					countDownLatch.countDown();
+				}
+			}, 0L, period);
+
+			countDownLatch.await();
+
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+
+		} finally {
+			timer.shutdownService();
+		}
+	}
+
+	/**
+	 * Tests that shutting down the SystemProcessingTimeService will also cancel the scheduled at
+	 * fix rate future.
+	 */
+	@Test
+	public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
+		final Object lock = new Object();
+		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+		final long period = 10L;
+
+		final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+			new ReferenceSettingExceptionHandler(errorRef), lock);
+
+		try {
+			ScheduledFuture<?> scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+				}
+			}, 0L, period);
+
+			assertFalse(scheduledFuture.isDone());
+
+			// this should cancel our future
+			timer.quiesceAndAwaitPending();
+
+			assertTrue(scheduledFuture.isCancelled());
+
+			scheduledFuture = timer.scheduleAtFixedRate(new ProcessingTimeCallback() {
+				@Override
+				public void onProcessingTime(long timestamp) throws Exception {
+					throw new Exception("Test exception.");
+				}
+			}, 0L, 100L);
+
+			assertNotNull(scheduledFuture);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+
+			if (errorRef.get() != null) {
+				throw new Exception(errorRef.get());
+			}
+
+		} finally {
+			timer.shutdownService();
+		}
+	}
+
 	@Test
 	public void testImmediateShutdown() throws Exception {
 
@@ -114,6 +244,21 @@ public class SystemProcessingTimeServiceTest {
 				// expected
 			}
 
+			try {
+				timer.scheduleAtFixedRate(
+					new ProcessingTimeCallback() {
+						@Override
+						public void onProcessingTime(long timestamp) {}
+					},
+					0L,
+					100L);
+
+				fail("should result in an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+
 			// obviously, we have an asynchronous interrupted exception
 			assertNotNull(errorRef.get());
 			assertTrue(errorRef.get().getCause() instanceof InterruptedException);
@@ -206,6 +351,18 @@ public class SystemProcessingTimeServiceTest {
 
 			assertEquals(0, timer.getNumTasksScheduled());
 
+			future = timer.scheduleAtFixedRate(
+				new ProcessingTimeCallback() {
+					@Override
+					public void onProcessingTime(long timestamp) throws Exception {}
+				}, 10000000000L, 50L);
+
+			assertEquals(1, timer.getNumTasksScheduled());
+
+			future.cancel(false);
+
+			assertEquals(0, timer.getNumTasksScheduled());
+
 			// check that no asynchronous error was reported
 			if (errorRef.get() != null) {
 				throw new Exception(errorRef.get());
@@ -241,4 +398,33 @@ public class SystemProcessingTimeServiceTest {
 		latch.await();
 		assertTrue(exceptionWasThrown.get());
 	}
+
+	@Test
+	public void testExceptionReportingScheduleAtFixedRate() throws InterruptedException {
+		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+		final OneShotLatch latch = new OneShotLatch();
+		final Object lock = new Object();
+
+		ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(
+			new AsyncExceptionHandler() {
+				@Override
+				public void handleAsyncException(String message, Throwable exception) {
+					exceptionWasThrown.set(true);
+					latch.trigger();
+				}
+			}, lock);
+
+		timeServiceProvider.scheduleAtFixedRate(
+			new ProcessingTimeCallback() {
+			@Override
+			public void onProcessingTime(long timestamp) throws Exception {
+				throw new Exception("Exception in Timer");
+			}
+		},
+			0L,
+			100L	);
+
+		latch.await();
+		assertTrue(exceptionWasThrown.get());
+	}
 }