You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:16:56 UTC

[06/17] flink git commit: [FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks Handling

[FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks Handling

This closes #2546


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ff451be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ff451be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ff451be

Branch: refs/heads/master
Commit: 8ff451bec58e9f5800eb77c74c1d7457b776cc94
Parents: c62776f
Author: kl0u <kk...@gmail.com>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java |  17 +-
 .../hdfstests/ContinuousFileMonitoringTest.java | 209 ++++++++++++--
 .../fs/bucketing/BucketingSinkTest.java         |   4 +-
 .../source/ContinuousFileReaderOperator.java    |  96 ++++---
 .../streaming/api/operators/StreamSource.java   | 275 +-----------------
 .../api/operators/StreamSourceContexts.java     | 284 +++++++++++++++++++
 .../runtime/tasks/AsyncExceptionHandler.java    |   8 +-
 .../tasks/DefaultTimeServiceProvider.java       |  11 +-
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  54 +---
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../operators/StreamSourceOperatorTest.java     |  17 +-
 .../runtime/operators/TimeProviderTest.java     |  79 ++++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  34 ++-
 ...AlignedProcessingTimeWindowOperatorTest.java |  36 ++-
 .../runtime/tasks/StreamMockEnvironment.java    |   8 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   4 +-
 .../util/OneInputStreamOperatorTestHarness.java |  23 +-
 19 files changed, 694 insertions(+), 480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bccbabc..2ebd84a 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -86,7 +86,7 @@ public class RocksDBAsyncSnapshotTest {
 	}
 
 	/**
-	 * This ensures that asynchronous state handles are actually materialized asynchonously.
+	 * This ensures that asynchronous state handles are actually materialized asynchronously.
 	 *
 	 * <p>We use latches to block at various stages and see if the code still continues through
 	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
@@ -168,7 +168,6 @@ public class RocksDBAsyncSnapshotTest {
 				while (!field.getBoolean(task)) {
 					Thread.sleep(10);
 				}
-
 			}
 		}
 
@@ -189,7 +188,9 @@ public class RocksDBAsyncSnapshotTest {
 		Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
 
 		testHarness.waitForTaskCompletion();
-		task.checkTimerException();
+		if (mockEnv.wasFailedExternally()) {
+			Assert.fail("Unexpected exception during execution.");
+		}
 	}
 
 	/**
@@ -261,8 +262,10 @@ public class RocksDBAsyncSnapshotTest {
 			threadPool.shutdown();
 			Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
 			testHarness.waitForTaskCompletion();
-			task.checkTimerException();
 
+			if (mockEnv.wasFailedExternally()) {
+				throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
+			}
 			Assert.fail("Operation completed. Cancel failed.");
 		} catch (Exception expected) {
 			AsynchronousException asynchronousException = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index 663345c..079bf04 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -120,7 +120,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(1);
+			env.setParallelism(4);
 
 			format.setFilesFilter(FilePathFilter.createDefaultFilter());
 			ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 
 			TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 			ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-			TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+			TestingSinkFunction sink = new TestingSinkFunction();
 
 			DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
 			splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 
 	private static class TestingSinkFunction extends RichSinkFunction<String> {
 
-		private final ContinuousFileMonitoringFunction src;
-
 		private int elementCounter = 0;
 		private Map<Integer, Integer> elementCounters = new HashMap<>();
 		private Map<Integer, List<String>> collectedContent = new HashMap<>();
 
-		TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
-			this.src = monitoringFunction;
-		}
-
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			// this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 				Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
 			}
 			expectedContents.clear();
-
-			src.cancel();
-			try {
-				src.close();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
 		}
 
 		private int getLineNo(String line) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 8a700f5..36b5c5e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -51,7 +53,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 
 public class ContinuousFileMonitoringTest {
@@ -106,6 +107,155 @@ public class ContinuousFileMonitoringTest {
 	//						TESTS
 
 	@Test
+	public void testFileReadingOperatorWithIngestionTime() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+
+		for(int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		final long watermarkInterval = 10;
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		reader.setOutputType(typeInfo, executionConfig);
+
+		final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		tester.open();
+
+		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
+
+		// test that watermarks are correctly emitted
+
+		timeServiceProvider.setCurrentTime(201);
+		timeServiceProvider.setCurrentTime(301);
+		timeServiceProvider.setCurrentTime(401);
+		timeServiceProvider.setCurrentTime(501);
+
+		int i = 0;
+		for(Object line: tester.getOutput()) {
+			if (!(line instanceof Watermark)) {
+				Assert.fail("Only watermarks are expected here ");
+			}
+			Watermark w = (Watermark) line;
+			Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+			i++;
+		}
+
+		// clear the output to get the elements only and the final watermark
+		tester.getOutput().clear();
+		Assert.assertEquals(0, tester.getOutput().size());
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+		long lastSeenWatermark = Long.MIN_VALUE;
+		int lineCounter = 0;	// counter for the lines read from the splits
+		int watermarkCounter = 0;
+
+		for(FileInputSplit split: splits) {
+
+			// set the next "current processing time".
+			long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+			timeServiceProvider.setCurrentTime(nextTimestamp);
+
+			// send the next split to be read and wait until it is fully read.
+			tester.processElement(new StreamRecord<>(split));
+			synchronized (tester.getCheckpointLock()) {
+				while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+					tester.getCheckpointLock().wait(10);
+				}
+			}
+
+			// verify that the results are the expected
+			for(Object line: tester.getOutput()) {
+				if (line instanceof StreamRecord) {
+					StreamRecord<String> element = (StreamRecord<String>) line;
+					lineCounter++;
+
+					Assert.assertEquals(nextTimestamp, element.getTimestamp());
+
+					int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+					List<String> content = actualFileContents.get(fileIdx);
+					if (content == null) {
+						content = new ArrayList<>();
+						actualFileContents.put(fileIdx, content);
+					}
+					content.add(element.getValue() + "\n");
+				} else if (line instanceof Watermark) {
+					long watermark = ((Watermark) line).getTimestamp();
+
+					Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
+					Assert.assertTrue(watermark > lastSeenWatermark);
+					watermarkCounter++;
+
+					lastSeenWatermark = watermark;
+				} else {
+					Assert.fail("Unknown element in the list.");
+				}
+			}
+
+			// clean the output to be ready for the next split
+			tester.getOutput().clear();
+		}
+
+		// now we are processing one split after the other,
+		// so all the elements must be here by now.
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+		// because we expect one watermark per split.
+		Assert.assertEquals(NO_OF_FILES, watermarkCounter);
+
+		// then close the reader gracefully so that the Long.MAX watermark is emitted
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+
+		// check if the last element is the LongMax watermark (by now this must be the only element)
+		Assert.assertEquals(1, tester.getOutput().size());
+		Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
+
+		// check if the elements are the expected ones.
+		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+		}
+	}
+
+	@Test
 	public void testFileReadingOperator() throws Exception {
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();
@@ -119,10 +269,11 @@ public class ContinuousFileMonitoringTest {
 		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		reader.setOutputType(typeInfo, new ExecutionConfig());
+
 		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
 			new OneInputStreamOperatorTestHarness<>(reader);
-
-		reader.setOutputType(typeInfo, new ExecutionConfig());
+		tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		tester.open();
 
 		// create the necessary splits for the test
@@ -134,38 +285,38 @@ public class ContinuousFileMonitoringTest {
 			tester.processElement(new StreamRecord<>(split));
 		}
 
-		// then close the reader gracefully
+		// then close the reader gracefully (and wait to finish reading)
 		synchronized (tester.getCheckpointLock()) {
 			tester.close();
 		}
 
-		/*
-		* Given that the reader is multithreaded, the test finishes before the reader thread finishes
-		* reading. This results in files being deleted by the test before being read, thus throwing an exception.
-		* In addition, even if file deletion happens at the end, the results are not ready for testing.
-		* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
-		*/
+		// the lines received must be the elements in the files +1 for for the longMax watermark
+		// we are in event time, which emits no watermarks, so the last watermark will mark the
+		// of the input stream.
 
-		long start = System.currentTimeMillis();
-		Queue<Object> output;
-		do {
-			output = tester.getOutput();
-			Thread.sleep(50);
-		} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
 
 		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+		Object lastElement = null;
 		for(Object line: tester.getOutput()) {
-			StreamRecord<String> element = (StreamRecord<String>) line;
-
-			int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
-			List<String> content = actualFileContents.get(fileIdx);
-			if(content == null) {
-				content = new ArrayList<>();
-				actualFileContents.put(fileIdx, content);
+			lastElement = line;
+			if (line instanceof StreamRecord) {
+				StreamRecord<String> element = (StreamRecord<String>) line;
+
+				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+				List<String> content = actualFileContents.get(fileIdx);
+				if (content == null) {
+					content = new ArrayList<>();
+					actualFileContents.put(fileIdx, content);
+				}
+				content.add(element.getValue() + "\n");
 			}
-			content.add(element.getValue() +"\n");
 		}
 
+		// check if the last element is the LongMax watermark
+		Assert.assertTrue(lastElement instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
+
 		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
 		for (Integer fileIdx: expectedFileContents.keySet()) {
 			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
@@ -224,7 +375,7 @@ public class ContinuousFileMonitoringTest {
 		monitoringFunction.open(new Configuration());
 		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
 		for(int i = 0; i < NO_OF_FILES; i++) {
 			org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
 			Assert.assertTrue(uniqFilesFound.contains(file.toString()));
@@ -268,8 +419,8 @@ public class ContinuousFileMonitoringTest {
 		t.interrupt();
 		fc.join();
 
-		Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
-		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+		Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
 
 		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
 		Set<String> fileNamesCreated = new HashSet<>();
@@ -316,7 +467,7 @@ public class ContinuousFileMonitoringTest {
 		// wait until all the files are created
 		fc.join();
 
-		Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+		Assert.assertEquals(NO_OF_FILES, filesCreated.size());
 
 		Set<String> fileNamesCreated = new HashSet<>();
 		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -337,7 +488,7 @@ public class ContinuousFileMonitoringTest {
 
 	private int getLineNo(String line) {
 		String[] tkns = line.split("\\s");
-		Assert.assertTrue(tkns.length == 6);
+		Assert.assertEquals(6, tkns.length);
 		return Integer.parseInt(tkns[tkns.length - 1]);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index e274fdd..ac1e3f0 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -70,7 +70,7 @@ public class BucketingSinkTest {
 	private static org.apache.hadoop.fs.FileSystem dfs;
 	private static String hdfsURI;
 
-	private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TimeServiceProvider clock) {
+	private OneInputStreamOperatorTestHarness<String, Object> createTestSink(File dataDir, TestTimeServiceProvider clock) {
 		BucketingSink<String> sink = new BucketingSink<String>(dataDir.getAbsolutePath())
 			.setBucketer(new Bucketer<String>() {
 				private static final long serialVersionUID = 1L;
@@ -91,7 +91,7 @@ public class BucketingSinkTest {
 	}
 
 	private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(BucketingSink<T> sink,
-																			TimeServiceProvider clock) {
+																			TestTimeServiceProvider clock) {
 		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), new ExecutionConfig(), clock);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 838bee6..35e72a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,14 +29,13 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,20 +43,24 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and have another
+ * thread read the actual data from the split. This architecture allows the separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
@@ -67,16 +70,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
+	/** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
 	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
 
-	private transient SplitReader<S, OUT> reader;
-	private transient TimestampedCollector<OUT> collector;
-
 	private FileInputFormat<OUT> format;
 	private TypeSerializer<OUT> serializer;
 
 	private transient Object checkpointLock;
 
+	private transient SplitReader<S, OUT> reader;
+	private transient SourceFunction.SourceContext<OUT> readerContext;
 	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -92,25 +95,22 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	public void open() throws Exception {
 		super.open();
 
-		if (this.serializer == null) {
-			throw new IllegalStateException("The serializer has not been set. " +
-				"Probably the setOutputType() was not called and this should not have happened. " +
-				"Please report it.");
-		}
+		checkState(this.reader == null, "The reader is already initialized.");
+		checkState(this.serializer != null, "The serializer has not been set. " +
+			"Probably the setOutputType() was not called. Please report it.");
 
 		this.format.setRuntimeContext(getRuntimeContext());
 		this.format.configure(new Configuration());
-
-		this.collector = new TimestampedCollector<>(output);
 		this.checkpointLock = getContainingTask().getCheckpointLock();
 
-		Preconditions.checkState(reader == null, "The reader is already initialized.");
+		// set the reader context based on the time characteristic
+		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
+		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+		this.readerContext = StreamSourceContexts.getSourceContext(
+			timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval);
 
-		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
-
-		// the readerState is needed for the initialization of the reader
-		// when recovering from a failure. So after the initialization,
-		// we can set it to null.
+		// and initialize the split reading thread
+		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
 		this.readerState = null;
 		this.reader.start();
 	}
@@ -122,7 +122,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
+		// we do nothing because we emit our own watermarks if needed.
 	}
 
 	@Override
@@ -156,7 +156,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			}
 		}
 		reader = null;
-		collector = null;
+		readerContext = null;
+		readerState = null;
 		format = null;
 		serializer = null;
 	}
@@ -177,7 +178,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			// called by the StreamTask while having it.
 			checkpointLock.wait();
 		}
-		collector.close();
+
+		// finally if we are closed normally and we are operating on
+		// event or ingestion time, emit the max watermark indicating
+		// the end of the stream, like a normal source would do.
+
+		if (readerContext != null) {
+			readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+			readerContext.close();
+		}
+		output.close();
 	}
 
 	private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -188,7 +198,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		private final TypeSerializer<OT> serializer;
 
 		private final Object checkpointLock;
-		private final TimestampedCollector<OT> collector;
+		private final SourceFunction.SourceContext<OT> readerContext;
 
 		private final Queue<FileInputSplit> pendingSplits;
 
@@ -200,16 +210,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 		private SplitReader(FileInputFormat<OT> format,
 					TypeSerializer<OT> serializer,
-					TimestampedCollector<OT> collector,
+					SourceFunction.SourceContext<OT> readerContext,
 					Object checkpointLock,
 					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
 
 			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
 			this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
+			this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
+			this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
 
-			this.pendingSplits = new LinkedList<>();
-			this.collector = collector;
-			this.checkpointLock = checkpointLock;
+			this.pendingSplits = new ArrayDeque<>();
 			this.isRunning = true;
 
 			// this is the case where a task recovers from a previous failed attempt
@@ -219,7 +229,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				S formatState = restoredState.f2;
 
 				for (FileInputSplit split : pending) {
-					Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 					pendingSplits.add(split);
 				}
 
@@ -229,9 +238,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		}
 
 		private void addSplit(FileInputSplit split) {
-			Preconditions.checkNotNull(split);
+			checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
 			synchronized (checkpointLock) {
-				Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 				this.pendingSplits.add(split);
 			}
 		}
@@ -267,7 +275,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 								checkpointableFormat.reopen(currentSplit, restoredFormatState);
 							} else {
 								// this is the case of a non-checkpointable input format that will reprocess the last split.
-								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+								LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
 								format.open(currentSplit);
 							}
 							// reset the restored state to null for the next iteration
@@ -299,7 +307,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							synchronized (checkpointLock) {
 								nextElement = format.nextRecord(nextElement);
 								if (nextElement != null) {
-									collector.collect(nextElement);
+									readerContext.collect(nextElement);
 								} else {
 									break;
 								}
@@ -318,10 +326,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				}
 
 			} catch (Throwable e) {
-				if (isRunning) {
-					LOG.error("Caught exception processing split: ", currentSplit);
-				}
-				getContainingTask().failExternally(e);
+				getContainingTask().handleAsyncException("Caught exception when processing split: " + currentSplit, e);
 			} finally {
 				synchronized (checkpointLock) {
 					LOG.info("Reader terminated, and exiting...");
@@ -358,7 +363,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							restoredFormatState;
 					return new Tuple3<>(snapshot, currentSplit, formatState);
 				} else {
-					LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+					LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
 					return new Tuple3<>(snapshot, currentSplit, null);
 				}
 			} else {
@@ -404,7 +409,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
 
 		// read the pending splits list
-		List<FileInputSplit> pendingSplits = new LinkedList<>();
+		List<FileInputSplit> pendingSplits = new ArrayList<>();
 		int noOfSplits = ois.readInt();
 		for (int i = 0; i < noOfSplits; i++) {
 			FileInputSplit split = (FileInputSplit) ois.readObject();
@@ -416,8 +421,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		S formatState = (S) ois.readObject();
 
 		// set the whole reader state for the open() to find.
-		Preconditions.checkState(this.readerState == null,
-			"The reader state has already been initialized.");
+		checkState(this.readerState == null, "The reader state has already been initialized.");
 
 		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 22987ab..1409ae4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,11 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import java.util.concurrent.ScheduledFuture;
 
 /**
  * {@link StreamOperator} for streaming sources.
@@ -57,26 +53,11 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	
 	public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
-		final SourceFunction.SourceContext<OUT> ctx;
-		
-		switch (timeCharacteristic) {
-			case EventTime:
-				ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
-				break;
-			case IngestionTime:
-				ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
-						getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
-				break;
-			case ProcessingTime:
-				ctx = new NonTimestampContext<>(this, lockingObject, collector);
-				break;
-			default:
-				throw new Exception(String.valueOf(timeCharacteristic));
-		}
+		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+
+		this.ctx = StreamSourceContexts.getSourceContext(
+			timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval);
 
-		// copy to a field to give the 'cancel()' method access
-		this.ctx = ctx;
-		
 		try {
 			userFunction.run(ctx);
 
@@ -122,252 +103,4 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	protected boolean isCanceledOrStopped() {
 		return canceledOrStopped;
 	}
-
-	/**
-	 * Checks whether any asynchronous thread (checkpoint trigger, timer, watermark generator, ...)
-	 * has caused an exception. If one of these threads caused an exception, this method will
-	 * throw that exception.
-	 */
-	void checkAsyncException() {
-		getContainingTask().checkTimerException();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Source contexts for various stream time characteristics
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * A source context that attached {@code -1} as a timestamp to all records, and that
-	 * does not forward watermarks.
-	 */
-	public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final StreamSource<?, ?> owner;
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
-			this.owner = owner;
-			this.lockingObject = lockingObject;
-			this.output = output;
-			this.reuse = new StreamRecord<T>(null);
-		}
-
-		@Override
-		public void collect(T element) {
-			owner.checkAsyncException();
-			synchronized (lockingObject) {
-				output.collect(reuse.replace(element));
-			}
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			// ignore the timestamp
-			collect(element);
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			owner.checkAsyncException();
-			// do nothing else
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {}
-	}
-	
-	/**
-	 * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
-	 * and watermark emission.
-	 */
-	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final StreamSource<?, ?> owner;
-		private final TimeServiceProvider timeService;
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-		
-		private final ScheduledFuture<?> watermarkTimer;
-		private final long watermarkInterval;
-
-		private volatile long nextWatermarkTime;
-
-		public AutomaticWatermarkContext(
-				final StreamSource<?, ?> owner,
-				final Object lockingObjectParam,
-				final Output<StreamRecord<T>> outputParam,
-				final long watermarkInterval) {
-			
-			if (watermarkInterval < 1L) {
-				throw new IllegalArgumentException("The watermark interval cannot be smaller than one.");
-			}
-
-			this.owner = owner;
-			this.timeService = owner.getTimerService();
-			this.lockingObject = lockingObjectParam;
-			this.output = outputParam;
-			this.watermarkInterval = watermarkInterval;
-			this.reuse = new StreamRecord<T>(null);
-
-			long now = this.timeService.getCurrentProcessingTime();
-			this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
-				new WatermarkEmittingTask(this.timeService, lockingObjectParam, outputParam));
-		}
-
-		@Override
-		public void collect(T element) {
-			owner.checkAsyncException();
-			
-			synchronized (lockingObject) {
-				final long currentTime = this.timeService.getCurrentProcessingTime();
-				output.collect(reuse.replace(element, currentTime));
-
-				// this is to avoid lock contention in the lockingObject by
-				// sending the watermark before the firing of the watermark
-				// emission task.
-
-				if (currentTime > nextWatermarkTime) {
-					// in case we jumped some watermarks, recompute the next watermark time
-					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-					nextWatermarkTime = watermarkTime + watermarkInterval;
-					output.emitWatermark(new Watermark(watermarkTime));
-
-					// we do not need to register another timer here
-					// because the emitting task will do so.
-				}
-			}
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			collect(element);
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			owner.checkAsyncException();
-			
-			if (mark.getTimestamp() == Long.MAX_VALUE) {
-				// allow it since this is the special end-watermark that for example the Kafka source emits
-				synchronized (lockingObject) {
-					nextWatermarkTime = Long.MAX_VALUE;
-					output.emitWatermark(mark);
-				}
-
-				// we can shutdown the timer now, no watermarks will be needed any more
-				watermarkTimer.cancel(true);
-			}
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {
-			watermarkTimer.cancel(true);
-		}
-
-		private class WatermarkEmittingTask implements Triggerable {
-
-			private final TimeServiceProvider timeService;
-			private final Object lockingObject;
-			private final Output<StreamRecord<T>> output;
-
-			private WatermarkEmittingTask(TimeServiceProvider timeService, Object lock, Output<StreamRecord<T>> output) {
-				this.timeService = timeService;
-				this.lockingObject = lock;
-				this.output = output;
-			}
-
-			@Override
-			public void trigger(long timestamp) {
-				final long currentTime = this.timeService.getCurrentProcessingTime();
-
-				if (currentTime > nextWatermarkTime) {
-					// align the watermarks across all machines. this will ensure that we
-					// don't have watermarks that creep along at different intervals because
-					// the machine clocks are out of sync
-					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-
-					synchronized (lockingObject) {
-						if (currentTime > nextWatermarkTime) {
-							output.emitWatermark(new Watermark(watermarkTime));
-							nextWatermarkTime += watermarkInterval;
-						}
-					}
-				}
-
-				this.timeService.registerTimer(this.timeService.getCurrentProcessingTime() + watermarkInterval,
-					new WatermarkEmittingTask(this.timeService, lockingObject, output));
-			}
-		}
-	}
-
-	/**
-	 * A SourceContext for event time. Sources may directly attach timestamps and generate
-	 * watermarks, but if records are emitted without timestamps, no timestamps are automatically
-	 * generated and attached. The records will simply have no timestamp in that case.
-	 * 
-	 * Streaming topologies can use timestamp assigner functions to override the timestamps
-	 * assigned here.
-	 */
-	public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
-
-		private final StreamSource<?, ?> owner;
-		private final Object lockingObject;
-		private final Output<StreamRecord<T>> output;
-		private final StreamRecord<T> reuse;
-
-		public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
-			this.owner = owner;
-			this.lockingObject = lockingObject;
-			this.output = output;
-			this.reuse = new StreamRecord<T>(null);
-		}
-
-		@Override
-		public void collect(T element) {
-			owner.checkAsyncException();
-			
-			synchronized (lockingObject) {
-				output.collect(reuse.replace(element));
-			}
-		}
-
-		@Override
-		public void collectWithTimestamp(T element, long timestamp) {
-			owner.checkAsyncException();
-			
-			synchronized (lockingObject) {
-				output.collect(reuse.replace(element, timestamp));
-			}
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			owner.checkAsyncException();
-			
-			synchronized (lockingObject) {
-				output.emitWatermark(mark);
-			}
-		}
-
-		@Override
-		public Object getCheckpointLock() {
-			return lockingObject;
-		}
-
-		@Override
-		public void close() {}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
new file mode 100644
index 0000000..abaf4e7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Source contexts for various stream time characteristics.
+ */
+public class StreamSourceContexts {
+
+	/**
+	 * Depending on the {@link TimeCharacteristic}, this method will return the adequate
+	 * {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
+	 * <ul>
+	 * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}
+	 * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}
+	 * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}
+	 * </ul>
+	 * */
+	public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
+			TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService,
+			Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
+
+		final SourceFunction.SourceContext<OUT> ctx;
+		switch (timeCharacteristic) {
+			case EventTime:
+				ctx = new ManualWatermarkContext<>(checkpointLock, output);
+				break;
+			case IngestionTime:
+				ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval);
+				break;
+			case ProcessingTime:
+				ctx = new NonTimestampContext<>(checkpointLock, output);
+				break;
+			default:
+				throw new IllegalArgumentException(String.valueOf(timeCharacteristic));
+		}
+		return ctx;
+	}
+
+	/**
+	 * A source context that attached {@code -1} as a timestamp to all records, and that
+	 * does not forward watermarks.
+	 */
+	private static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
+
+		private final Object lock;
+		private final Output<StreamRecord<T>> output;
+		private final StreamRecord<T> reuse;
+
+		private NonTimestampContext(Object checkpointLock, Output<StreamRecord<T>> output) {
+			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+			this.reuse = new StreamRecord<>(null);
+		}
+
+		@Override
+		public void collect(T element) {
+			synchronized (lock) {
+				output.collect(reuse.replace(element));
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			// ignore the timestamp
+			collect(element);
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			// do nothing
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {}
+	}
+
+	/**
+	 * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
+	 * and watermark emission.
+	 */
+	private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+		private final TimeServiceProvider timeService;
+		private final Object lock;
+		private final Output<StreamRecord<T>> output;
+		private final StreamRecord<T> reuse;
+
+		private final ScheduledFuture<?> watermarkTimer;
+		private final long watermarkInterval;
+
+		private volatile long nextWatermarkTime;
+
+		private AutomaticWatermarkContext(
+			final TimeServiceProvider timeService,
+			final Object checkpointLock,
+			final Output<StreamRecord<T>> output,
+			final long watermarkInterval) {
+
+			this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+
+			Preconditions.checkArgument(watermarkInterval > 1L, "The watermark interval cannot be smaller than 1 ms.");
+			this.watermarkInterval = watermarkInterval;
+
+			this.reuse = new StreamRecord<>(null);
+
+			long now = this.timeService.getCurrentProcessingTime();
+			this.watermarkTimer = this.timeService.registerTimer(now + watermarkInterval,
+				new WatermarkEmittingTask(this.timeService, lock, output));
+		}
+
+		@Override
+		public void collect(T element) {
+			synchronized (lock) {
+				final long currentTime = this.timeService.getCurrentProcessingTime();
+				output.collect(reuse.replace(element, currentTime));
+
+				// this is to avoid lock contention in the lockingObject by
+				// sending the watermark before the firing of the watermark
+				// emission task.
+
+				if (currentTime > nextWatermarkTime) {
+					// in case we jumped some watermarks, recompute the next watermark time
+					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+					nextWatermarkTime = watermarkTime + watermarkInterval;
+					output.emitWatermark(new Watermark(watermarkTime));
+
+					// we do not need to register another timer here
+					// because the emitting task will do so.
+				}
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			collect(element);
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+
+			if (mark.getTimestamp() == Long.MAX_VALUE) {
+				// allow it since this is the special end-watermark that for example the Kafka source emits
+				synchronized (lock) {
+					nextWatermarkTime = Long.MAX_VALUE;
+					output.emitWatermark(mark);
+				}
+
+				// we can shutdown the timer now, no watermarks will be needed any more
+				if (watermarkTimer != null) {
+					watermarkTimer.cancel(true);
+				}
+			}
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+			if (watermarkTimer != null) {
+				watermarkTimer.cancel(true);
+			}
+		}
+
+		private class WatermarkEmittingTask implements Triggerable {
+
+			private final TimeServiceProvider timeService;
+			private final Object lock;
+			private final Output<StreamRecord<T>> output;
+
+			private WatermarkEmittingTask(TimeServiceProvider timeService, Object checkpointLock, Output<StreamRecord<T>> output) {
+				this.timeService = Preconditions.checkNotNull(timeService, "Time Service cannot be null.");
+				this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+				this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+			}
+
+			@Override
+			public void trigger(long timestamp) {
+				final long currentTime = timeService.getCurrentProcessingTime();
+
+				if (currentTime > nextWatermarkTime) {
+					// align the watermarks across all machines. this will ensure that we
+					// don't have watermarks that creep along at different intervals because
+					// the machine clocks are out of sync
+					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+
+					synchronized (lock) {
+						if (currentTime > nextWatermarkTime) {
+							output.emitWatermark(new Watermark(watermarkTime));
+							nextWatermarkTime = watermarkTime + watermarkInterval;
+						}
+					}
+				}
+
+				long nextWatermark = currentTime + watermarkInterval;
+				this.timeService.registerTimer(nextWatermark, new WatermarkEmittingTask(this.timeService, lock, output));
+			}
+		}
+	}
+
+	/**
+	 * A SourceContext for event time. Sources may directly attach timestamps and generate
+	 * watermarks, but if records are emitted without timestamps, no timestamps are automatically
+	 * generated and attached. The records will simply have no timestamp in that case.
+	 *
+	 * Streaming topologies can use timestamp assigner functions to override the timestamps
+	 * assigned here.
+	 */
+	private static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+		private final Object lock;
+		private final Output<StreamRecord<T>> output;
+		private final StreamRecord<T> reuse;
+
+		private ManualWatermarkContext(Object checkpointLock, Output<StreamRecord<T>> output) {
+			this.lock = Preconditions.checkNotNull(checkpointLock, "The checkpoint lock cannot be null.");
+			this.output = Preconditions.checkNotNull(output, "The output cannot be null.");
+			this.reuse = new StreamRecord<>(null);
+		}
+
+		@Override
+		public void collect(T element) {
+			synchronized (lock) {
+				output.collect(reuse.replace(element));
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			synchronized (lock) {
+				output.collect(reuse.replace(element, timestamp));
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			synchronized (lock) {
+				output.emitWatermark(mark);
+			}
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index c7ec2ed..4c55055 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -18,12 +18,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * Interface for reporting exceptions that are thrown in (possibly) a different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
  */
 public interface AsyncExceptionHandler {
 
 	/**
-	 * Registers the given exception.
+	 * Handles an exception thrown by another thread (e.g. a TriggerTask),
+	 * other than the one executing the main task.
 	 */
-	void registerAsyncException(AsynchronousException exception);
+	void handleAsyncException(String message, Throwable exception);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index ea2b07f..9534b3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -49,9 +50,9 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	private DefaultTimeServiceProvider(AsyncExceptionHandler task,
 									ScheduledExecutorService threadPoolExecutor,
 									Object checkpointLock) {
-		this.task = task;
-		this.timerService = threadPoolExecutor;
-		this.checkpointLock = checkpointLock;
+		this.task = Preconditions.checkNotNull(task);
+		this.timerService = Preconditions.checkNotNull(threadPoolExecutor);
+		this.checkpointLock = Preconditions.checkNotNull(checkpointLock);
 	}
 
 	@Override
@@ -99,7 +100,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 					target.trigger(timestamp);
 				} catch (Throwable t) {
 					TimerException asyncException = new TimerException(t);
-					exceptionHandler.registerAsyncException(asyncException);
+					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
 				}
 			}
 		}
@@ -109,7 +110,7 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
 	public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
 		return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
 			@Override
-			public void registerAsyncException(AsynchronousException exception) {
+			public void handleAsyncException(String message, Throwable exception) {
 				exception.printStackTrace();
 			}
 		}, executor, checkpointLock);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d6d2fb5..cf8853e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		final Object lock = getCheckpointLock();
 		
 		while (running && inputProcessor.processInput(operator, lock)) {
-			checkTimerException();
+
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 9802a16..33317fa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -53,6 +53,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -158,11 +159,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private List<Collection<OperatorStateHandle>> lazyRestoreOperatorState;
 
-	/**
-	 * This field is used to forward an exception that is caught in the timer thread or other
-	 * asynchronous Threads. Subclasses must ensure that exceptions stored here get thrown on the
-	 * actual execution Thread. */
-	private volatile AsynchronousException asyncException;
 
 	/** The currently active background materialization threads */
 	private final ClosableRegistry cancelables = new ClosableRegistry();
@@ -301,9 +297,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// still let the computation fail
 			tryDisposeAllOperators();
 			disposed = true;
-
-			// Don't forget to check and throw exceptions that happened in async thread one last time
-			checkTimerException();
 		}
 		finally {
 			// clean up everything we initialized
@@ -354,19 +347,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
-	/**
-	 * Marks task execution failed for an external reason (a reason other than the task code itself
-	 * throwing an exception). If the task is already in a terminal state
-	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
-	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
-	 * starts an asynchronous thread that aborts that code.
-	 *
-	 * <p>This method never blocks.</p>
-	 */
-	public void failExternally(Throwable cause) {
-		getEnvironment().failExternally(cause);
-	}
-
 	@Override
 	public final void cancel() throws Exception {
 		isRunning = false;
@@ -898,27 +878,21 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	/**
-	 * Check whether an exception was thrown in a Thread other than the main Thread. (For example
-	 * in the processing-time trigger Thread). This will rethrow that exception in case on
-	 * occurred.
+	 * Handles an exception thrown by another thread (e.g. a TriggerTask),
+	 * other than the one executing the main task by failing the task entirely.
+	 *
+	 * In more detail, it marks task execution failed for an external reason
+	 * (a reason other than the task code itself throwing an exception). If the task
+	 * is already in a terminal state (such as FINISHED, CANCELED, FAILED), or if the
+	 * task is already canceling this does nothing. Otherwise it sets the state to
+	 * FAILED, and, if the invokable code is running, starts an asynchronous thread
+	 * that aborts that code.
 	 *
-	 * <p>This must be called in the main loop of {@code StreamTask} subclasses to ensure
-	 * that we propagate failures.
+	 * <p>This method never blocks.</p>
 	 */
-	public void checkTimerException() throws AsynchronousException {
-		if (asyncException != null) {
-			throw asyncException;
-		}
-	}
-
 	@Override
-	public void registerAsyncException(AsynchronousException exception) {
-		if (isRunning) {
-			LOG.error("Asynchronous exception registered.", exception);
-		}
-		if (this.asyncException == null) {
-			this.asyncException = exception;
-		}
+	public void handleAsyncException(String message, Throwable exception) {
+		getEnvironment().failExternally(exception);
 	}
 
 	// ------------------------------------------------------------------------
@@ -1030,7 +1004,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			catch (Exception e) {
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(e);
-				owner.registerAsyncException(asyncException);
+				owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
 			}
 			finally {
 				cancelables.unregisterClosable(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..0197c53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 		final Object lock = getCheckpointLock();
 		
 		while (running && inputProcessor.processInput(operator, lock)) {
-			checkTimerException();
+
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index e8663f5..10b30d0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -187,12 +188,11 @@ public class StreamSourceOperatorTest {
 
 		final List<StreamElement> output = new ArrayList<>();
 
-		StreamSource.AutomaticWatermarkContext<String> ctx =
-			new StreamSource.AutomaticWatermarkContext<>(
-				operator,
-				operator.getContainingTask().getCheckpointLock(),
-				new CollectorOutput<String>(output),
-				operator.getExecutionConfig().getAutoWatermarkInterval());
+		StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
+			operator.getContainingTask().getTimerService(),
+			operator.getContainingTask().getCheckpointLock(),
+			new CollectorOutput<String>(output),
+			operator.getExecutionConfig().getAutoWatermarkInterval());
 
 		// periodically emit the watermarks
 		// even though we start from 1 the watermark are still
@@ -218,7 +218,7 @@ public class StreamSourceOperatorTest {
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 												TimeCharacteristic timeChar,
 												long watermarkInterval,
-												final TimeServiceProvider timeProvider) {
+												final TestTimeServiceProvider timeProvider) {
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -241,9 +241,6 @@ public class StreamSourceOperatorTest {
 		doAnswer(new Answer<TimeServiceProvider>() {
 			@Override
 			public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
-				if (timeProvider == null) {
-					throw new RuntimeException("The time provider is null.");
-				}
 				return timeProvider;
 			}
 		}).when(mockTask).getTimerService();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 60850d8..0351978 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
@@ -39,6 +38,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 
@@ -52,28 +52,24 @@ public class TimeProviderTest {
 		final OneShotLatch latch = new OneShotLatch();
 
 		final Object lock = new Object();
-		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider.create(
-				new AsyncExceptionHandler() {
-					@Override
-					public void registerAsyncException(AsynchronousException exception) {
-						exception.printStackTrace();
-					}
-				},
-				Executors.newSingleThreadScheduledExecutor(),
-				lock);
+		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+			.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
 
 		final List<Long> timestamps = new ArrayList<>();
 
-		long start = System.currentTimeMillis();
 		long interval = 50L;
-
 		final long noOfTimers = 20;
 
 		// we add 2 timers per iteration minus the first that would have a negative timestamp
-		final long expectedNoOfTimers = 2 * noOfTimers - 1;
+		final long expectedNoOfTimers = 2 * noOfTimers;
 
 		for (int i = 0; i < noOfTimers; i++) {
-			double nextTimer = start + i * interval;
+
+			// we add a delay (100ms) so that both timers are inserted before the first is processed.
+			// If not, and given that we add timers out of order, we may have a timer firing
+			// before the next one (with smaller timestamp) is added.
+
+			double nextTimer = timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval;
 
 			timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() {
 				@Override
@@ -88,17 +84,15 @@ public class TimeProviderTest {
 			// add also out-of-order tasks to verify that eventually
 			// they will be executed in the correct order.
 
-			if (i > 0) {
-				timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() {
-					@Override
-					public void trigger(long timestamp) throws Exception {
-						timestamps.add(timestamp);
-						if (timestamps.size() == expectedNoOfTimers) {
-							latch.trigger();
-						}
+			timeServiceProvider.registerTimer((long) (nextTimer - 10L), new Triggerable() {
+				@Override
+				public void trigger(long timestamp) throws Exception {
+					timestamps.add(timestamp);
+					if (timestamps.size() == expectedNoOfTimers) {
+						latch.trigger();
 					}
-				});
-			}
+				}
+			});
 		}
 
 		if (!latch.isTriggered()) {
@@ -114,15 +108,46 @@ public class TimeProviderTest {
 		long lastTs = Long.MIN_VALUE;
 		for (long timestamp: timestamps) {
 			Assert.assertTrue(timestamp >= lastTs);
+			if (lastTs != Long.MIN_VALUE && counter % 2 == 1) {
+				Assert.assertEquals((timestamp - lastTs), 10);
+			}
 			lastTs = timestamp;
-
-			long expectedTs = start + (counter/2) * interval;
-			Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40)));
 			counter++;
 		}
 	}
 
 	@Test
+	public void testDefaultTimeProviderExceptionHandling() throws InterruptedException {
+		final OneShotLatch latch = new OneShotLatch();
+
+		final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+
+		final Object lock = new Object();
+
+		TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+			.create(new AsyncExceptionHandler() {
+				@Override
+				public void handleAsyncException(String message, Throwable exception) {
+					exceptionWasThrown.compareAndSet(false, true);
+					latch.trigger();
+				}
+			}, Executors.newSingleThreadScheduledExecutor(), lock);
+
+		long now = System.currentTimeMillis();
+		timeServiceProvider.registerTimer(now, new Triggerable() {
+			@Override
+			public void trigger(long timestamp) throws Exception {
+				throw new Exception("Exception in Timer");
+			}
+		});
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+		Assert.assertTrue(exceptionWasThrown.get());
+	}
+
+	@Test
 	public void testTimerSorting() throws Exception {
 
 		final List<Long> result = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index f33da89..30f38e3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -183,13 +183,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+		TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
 			Executors.newSingleThreadScheduledExecutor(), lock);
 
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
 
@@ -201,6 +201,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -209,6 +214,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -217,6 +227,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -243,7 +258,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			final int windowSize = 50;
 			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -297,7 +312,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -359,7 +374,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -416,7 +431,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -743,7 +758,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
-	private static StreamTask<?, ?> createMockTask() {
+	private static StreamTask<?, ?> createMockTask(Object lock) {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
 
@@ -751,6 +766,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
 		when(task.getName()).thenReturn("Test task name");
 		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(task.getCheckpointLock()).thenReturn(lock);
 
 		final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
 		when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -765,9 +781,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	}
 
 	private static StreamTask<?, ?> createMockTaskWithTimer(
-		final TimeServiceProvider timerService)
+		final TimeServiceProvider timerService, final Object lock)
 	{
-		StreamTask<?, ?> mockTask = createMockTask();
+		StreamTask<?, ?> mockTask = createMockTask(lock);
 		when(mockTask.getTimerService()).thenReturn(timerService);
 		return mockTask;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 826b230..7539c2d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -191,13 +191,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowTriggerTimeAlignment() throws Exception {
 		final Object lock = new Object();
-		final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
+		TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
 			Executors.newSingleThreadScheduledExecutor(), lock);
 
 		try {
 			@SuppressWarnings("unchecked")
 			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			AggregatingProcessingTimeWindowOperator<String, String> op;
 
@@ -209,6 +209,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -217,6 +222,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -225,6 +235,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
 			op.dispose();
 
+			timerService.shutdownService();
+			timerService = DefaultTimeServiceProvider.createForTesting(
+				Executors.newSingleThreadScheduledExecutor(), lock);
+			mockTask = createMockTaskWithTimer(timerService, lock);
+
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
 					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
 			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -257,7 +272,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
 			op.open();
@@ -309,7 +324,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSize = 50;
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
@@ -377,7 +392,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
 
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -448,7 +463,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
 			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -508,7 +523,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
 
@@ -929,7 +944,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static StreamTask<?, ?> createMockTask() {
+	private static StreamTask<?, ?> createMockTask(Object lock) {
 		Configuration configuration = new Configuration();
 		configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
 
@@ -937,6 +952,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
 		when(task.getName()).thenReturn("Test task name");
 		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
+		when(task.getCheckpointLock()).thenReturn(lock);
 
 		final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
 		when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -947,9 +963,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		return task;
 	}
 
-	private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService)
+	private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock)
 	{
-		StreamTask<?, ?> mockTask = createMockTask();
+		StreamTask<?, ?> mockTask = createMockTask(lock);
 		when(mockTask.getTimerService()).thenReturn(timerService);
 		return mockTask;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f638ddd..9b773d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private final ExecutionConfig executionConfig;
 
+	private volatile boolean wasFailedExternally = false;
+
 	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, ExecutionConfig executionConfig,
 									long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.taskInfo = new TaskInfo("", 1, 0, 1, 0);
@@ -325,7 +327,11 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public void failExternally(Throwable cause) {
-		throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
+		this.wasFailedExternally = true;
+	}
+
+	public boolean wasFailedExternally() {
+		return wasFailedExternally;
 	}
 
 	@Override