You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/05 15:57:12 UTC

flink git commit: [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

Repository: flink
Updated Branches:
  refs/heads/release-1.1 fddd89bcd -> bab59dfa7


[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

This closes #2593.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bab59dfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bab59dfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bab59dfa

Branch: refs/heads/release-1.1
Commit: bab59dfa7cf94f4c392c3205ee180e72e1ad7814
Parents: fddd89b
Author: kl0u <kk...@gmail.com>
Authored: Tue Oct 4 15:27:59 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Oct 5 17:52:26 2016 +0200

----------------------------------------------------------------------
 .../ContinuousFileMonitoringFunctionITCase.java |  15 +-
 .../hdfstests/ContinuousFileMonitoringTest.java | 240 +++++++++++++++----
 .../source/ContinuousFileReaderOperator.java    | 108 +++++----
 .../api/operators/AsyncExceptionChecker.java    |  27 +++
 .../streaming/api/operators/StreamSource.java   |  53 ++--
 .../util/OneInputStreamOperatorTestHarness.java |   9 +
 6 files changed, 335 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
index e6cd5d9..dd4dc5a 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -130,7 +130,7 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 
 			TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 			ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
-			TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+			TestingSinkFunction sink = new TestingSinkFunction();
 
 			DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
 			splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
@@ -161,16 +161,10 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 
 	private static class TestingSinkFunction extends RichSinkFunction<String> {
 
-		private final ContinuousFileMonitoringFunction src;
-
 		private int elementCounter = 0;
 		private Map<Integer, Integer> elementCounters = new HashMap<>();
 		private Map<Integer, List<String>> collectedContent = new HashMap<>();
 
-		TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
-			this.src = monitoringFunction;
-		}
-
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			// this sink can only work with DOP 1
@@ -200,13 +194,6 @@ public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTest
 				Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
 			}
 			expectedContents.clear();
-
-			src.cancel();
-			try {
-				src.close();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
 		}
 
 		private int getLineNo(String line) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index def9378..9358ba9 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -27,12 +27,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -51,8 +53,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class ContinuousFileMonitoringTest {
 
@@ -106,10 +108,162 @@ public class ContinuousFileMonitoringTest {
 	//						TESTS
 
 	@Test
+	public void testFileReadingOperatorWithIngestionTime() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		final long watermarkInterval = 10;
+		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		reader.setOutputType(typeInfo, executionConfig);
+
+		final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
+		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+		tester.open();
+
+		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
+
+		// test that watermarks are correctly emitted
+
+		ConcurrentLinkedQueue<Object> output = tester.getOutput();
+
+		timeServiceProvider.setCurrentTime(201);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp());
+
+		timeServiceProvider.setCurrentTime(301);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp());
+
+		timeServiceProvider.setCurrentTime(401);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp());
+
+		timeServiceProvider.setCurrentTime(501);
+		Assert.assertTrue(output.peek() instanceof Watermark);
+		Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp());
+
+		Assert.assertTrue(output.isEmpty());
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+
+		long lastSeenWatermark = Long.MIN_VALUE;
+		int lineCounter = 0;	// counter for the lines read from the splits
+		int watermarkCounter = 0;
+
+		for (FileInputSplit split: splits) {
+
+			// set the next "current processing time".
+			long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+			timeServiceProvider.setCurrentTime(nextTimestamp);
+
+			// send the next split to be read and wait until it is fully read, the +1 is for the watermark.
+			tester.processElement(new StreamRecord<>(split));
+
+			// NOTE: the following check works because each file fits in one split.
+			// In other case it would fail and wait forever.
+			// BUT THIS IS JUST FOR THIS TEST
+			while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+				Thread.sleep(10);
+			}
+
+			// verify that the results are the expected
+			for (Object line: tester.getOutput()) {
+				if (line instanceof StreamRecord) {
+					StreamRecord<String> element = (StreamRecord<String>) line;
+					lineCounter++;
+
+					Assert.assertEquals(nextTimestamp, element.getTimestamp());
+
+					int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+					List<String> content = actualFileContents.get(fileIdx);
+					if (content == null) {
+						content = new ArrayList<>();
+						actualFileContents.put(fileIdx, content);
+					}
+					content.add(element.getValue() + "\n");
+				} else if (line instanceof Watermark) {
+					long watermark = ((Watermark) line).getTimestamp();
+
+					Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
+					Assert.assertTrue(watermark > lastSeenWatermark);
+					watermarkCounter++;
+
+					lastSeenWatermark = watermark;
+				} else {
+					Assert.fail("Unknown element in the list.");
+				}
+			}
+
+			// clean the output to be ready for the next split
+			tester.getOutput().clear();
+		}
+
+		// now we are processing one split after the other,
+		// so all the elements must be here by now.
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
+
+		// because we expect one watermark per split.
+		Assert.assertEquals(splits.length, watermarkCounter);
+
+		// then close the reader gracefully so that the Long.MAX watermark is emitted
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+
+		// check if the last element is the LongMax watermark (by now this must be the only element)
+		Assert.assertEquals(1, tester.getOutput().size());
+		Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp());
+
+		// check if the elements are the expected ones.
+		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
+		}
+	}
+
+	@Test
 	public void testFileReadingOperator() throws Exception {
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();
-		for(int i = 0; i < NO_OF_FILES; i++) {
+		for (int i = 0; i < NO_OF_FILES; i++) {
 			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
 			filesCreated.add(file.f0);
 			expectedFileContents.put(i, file.f1);
@@ -119,10 +273,11 @@ public class ContinuousFileMonitoringTest {
 		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		reader.setOutputType(typeInfo, new ExecutionConfig());
+
 		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
 			new OneInputStreamOperatorTestHarness<>(reader);
-
-		reader.setOutputType(typeInfo, new ExecutionConfig());
+		tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
 		tester.open();
 
 		// create the necessary splits for the test
@@ -130,42 +285,42 @@ public class ContinuousFileMonitoringTest {
 			reader.getRuntimeContext().getNumberOfParallelSubtasks());
 
 		// and feed them to the operator
-		for(FileInputSplit split: splits) {
+		for (FileInputSplit split: splits) {
 			tester.processElement(new StreamRecord<>(split));
 		}
 
-		// then close the reader gracefully
+		// then close the reader gracefully (and wait to finish reading)
 		synchronized (tester.getCheckpointLock()) {
 			tester.close();
 		}
 
-		/*
-		* Given that the reader is multithreaded, the test finishes before the reader thread finishes
-		* reading. This results in files being deleted by the test before being read, thus throwing an exception.
-		* In addition, even if file deletion happens at the end, the results are not ready for testing.
-		* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
-		*/
+		// the lines received must be the elements in the files +1 for for the longMax watermark
+		// we are in event time, which emits no watermarks, so the last watermark will mark the
+		// of the input stream.
 
-		long start = System.currentTimeMillis();
-		Queue<Object> output;
-		do {
-			output = tester.getOutput();
-			Thread.sleep(50);
-		} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE + 1, tester.getOutput().size());
 
 		Map<Integer, List<String>> actualFileContents = new HashMap<>();
-		for(Object line: tester.getOutput()) {
-			StreamRecord<String> element = (StreamRecord<String>) line;
-
-			int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
-			List<String> content = actualFileContents.get(fileIdx);
-			if(content == null) {
-				content = new ArrayList<>();
-				actualFileContents.put(fileIdx, content);
+		Object lastElement = null;
+		for (Object line: tester.getOutput()) {
+			lastElement = line;
+			if (line instanceof StreamRecord) {
+				StreamRecord<String> element = (StreamRecord<String>) line;
+
+				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+				List<String> content = actualFileContents.get(fileIdx);
+				if (content == null) {
+					content = new ArrayList<>();
+					actualFileContents.put(fileIdx, content);
+				}
+				content.add(element.getValue() + "\n");
 			}
-			content.add(element.getValue() +"\n");
 		}
 
+		// check if the last element is the LongMax watermark
+		Assert.assertTrue(lastElement instanceof Watermark);
+		Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp());
+
 		Assert.assertEquals(expectedFileContents.size(), actualFileContents.size());
 		for (Integer fileIdx: expectedFileContents.keySet()) {
 			Assert.assertTrue("file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx));
@@ -185,7 +340,7 @@ public class ContinuousFileMonitoringTest {
 			Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString());
 		}
 
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
 			hdfs.delete(file, false);
 		}
 	}
@@ -223,13 +378,13 @@ public class ContinuousFileMonitoringTest {
 		monitoringFunction.open(new Configuration());
 		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
-		for(int i = 0; i < NO_OF_FILES; i++) {
+		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
+		for (int i = 0; i < NO_OF_FILES; i++) {
 			org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
 			Assert.assertTrue(uniqFilesFound.contains(file.toString()));
 		}
 
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
 			hdfs.delete(file, false);
 		}
 	}
@@ -255,9 +410,10 @@ public class ContinuousFileMonitoringTest {
 				uniqFilesFound.wait(7 * INTERVAL);
 			}
 		}
+		fc.join();
 
-		Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
-		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+		Assert.assertEquals(NO_OF_FILES, fc.getFilesCreated().size());
+		Assert.assertEquals(NO_OF_FILES, uniqFilesFound.size());
 
 		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
 		Set<String> fileNamesCreated = new HashSet<>();
@@ -265,11 +421,11 @@ public class ContinuousFileMonitoringTest {
 			fileNamesCreated.add(path.toString());
 		}
 
-		for(String file: uniqFilesFound) {
+		for (String file: uniqFilesFound) {
 			Assert.assertTrue(fileNamesCreated.contains(file));
 		}
 
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
 			hdfs.delete(file, false);
 		}
 	}
@@ -301,7 +457,7 @@ public class ContinuousFileMonitoringTest {
 		// wait until all the files are created
 		fc.join();
 
-		Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+		Assert.assertEquals(NO_OF_FILES, filesCreated.size());
 
 		Set<String> fileNamesCreated = new HashSet<>();
 		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
@@ -309,11 +465,11 @@ public class ContinuousFileMonitoringTest {
 		}
 
 		Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size());
-		for(String file: uniqFilesFound) {
+		for (String file: uniqFilesFound) {
 			Assert.assertTrue(fileNamesCreated.contains(file));
 		}
 
-		for(org.apache.hadoop.fs.Path file: filesCreated) {
+		for (org.apache.hadoop.fs.Path file: filesCreated) {
 			hdfs.delete(file, false);
 		}
 	}
@@ -322,7 +478,7 @@ public class ContinuousFileMonitoringTest {
 
 	private int getLineNo(String line) {
 		String[] tkns = line.split("\\s");
-		Assert.assertTrue(tkns.length == 6);
+		Assert.assertEquals(6, tkns.length);
 		return Integer.parseInt(tkns[tkns.length - 1]);
 	}
 
@@ -345,7 +501,7 @@ public class ContinuousFileMonitoringTest {
 
 		public void run() {
 			try {
-				for(int i = 0; i < NO_OF_FILES; i++) {
+				for (int i = 0; i < NO_OF_FILES; i++) {
 					Tuple2<org.apache.hadoop.fs.Path, String> file =
 						fillWithData(hdfsURI, "file", i, "This is test line.");
 
@@ -427,12 +583,12 @@ public class ContinuousFileMonitoringTest {
 		assert (hdfs != null);
 
 		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-		Assert.assertTrue (!hdfs.exists(file));
+		Assert.assertFalse(hdfs.exists(file));
 
 		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
 		FSDataOutputStream stream = hdfs.create(tmp);
 		StringBuilder str = new StringBuilder();
-		for(int i = 0; i < LINES_PER_FILE; i++) {
+		for (int i = 0; i < LINES_PER_FILE; i++) {
 			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
 			str.append(line);
 			stream.write(line.getBytes());

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index fda5efd..923943f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -29,15 +29,15 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AsyncExceptionChecker;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,39 +46,44 @@ import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
- * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
- * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
- * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
- * so that the checkpoints reflect the current state.
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
+ * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
+ * a parallelism of 1.
+ * <p/>
+ * This operator will receive the split descriptors, put them in a queue, and have another
+ * thread read the actual data from the split. This architecture allows the separation of the
+ * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * back-pressure.
  */
 @Internal
 public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
+	/** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
 	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
 
-	private transient SplitReader<S, OUT> reader;
-	private transient TimestampedCollector<OUT> collector;
-
 	private FileInputFormat<OUT> format;
 	private TypeSerializer<OUT> serializer;
 
 	private transient Object checkpointLock;
 
+	private transient SplitReader<S, OUT> reader;
+	private transient SourceFunction.SourceContext<OUT> readerContext;
 	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
@@ -94,25 +99,34 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	public void open() throws Exception {
 		super.open();
 
-		if (this.serializer == null) {
-			throw new IllegalStateException("The serializer has not been set. " +
-				"Probably the setOutputType() was not called and this should not have happened. " +
-				"Please report it.");
-		}
+		checkState(this.reader == null, "The reader is already initialized.");
+		checkState(this.serializer != null, "The serializer has not been set. " +
+			"Probably the setOutputType() was not called. Please report it.");
 
 		this.format.setRuntimeContext(getRuntimeContext());
 		this.format.configure(new Configuration());
-
-		this.collector = new TimestampedCollector<>(output);
 		this.checkpointLock = getContainingTask().getCheckpointLock();
 
-		Preconditions.checkState(reader == null, "The reader is already initialized.");
-
-		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+		// set the reader context based on the time characteristic
+		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
+
+		switch (timeCharacteristic) {
+			case EventTime:
+				this.readerContext = new StreamSource.ManualWatermarkContext<>(this, this.checkpointLock, this.output);
+				break;
+			case IngestionTime:
+				final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+				this.readerContext = new StreamSource.AutomaticWatermarkContext<>(this, this.checkpointLock, this.output, watermarkInterval);
+				break;
+			case ProcessingTime:
+				this.readerContext = new StreamSource.NonTimestampContext<>(this, this.checkpointLock, this.output);
+				break;
+			default:
+				throw new Exception(String.valueOf(timeCharacteristic));
+		}
 
-		// the readerState is needed for the initialization of the reader
-		// when recovering from a failure. So after the initialization,
-		// we can set it to null.
+		// and initialize the split reading thread
+		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
 		this.readerState = null;
 		this.reader.start();
 	}
@@ -124,7 +138,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
+		// we do nothing because we emit our own watermarks if needed.
 	}
 
 	@Override
@@ -158,7 +172,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			}
 		}
 		reader = null;
-		collector = null;
+		readerContext = null;
+		readerState = null;
 		format = null;
 		serializer = null;
 	}
@@ -179,7 +194,21 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			// called by the StreamTask while having it.
 			checkpointLock.wait();
 		}
-		collector.close();
+
+		// finally if we are closed normally and we are operating on
+		// event or ingestion time, emit the max watermark indicating
+		// the end of the stream, like a normal source would do.
+
+		if (readerContext != null) {
+			readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+			readerContext.close();
+		}
+		output.close();
+	}
+
+	@Override
+	public void checkAsyncException() {
+		// do nothing
 	}
 
 	private class SplitReader<S extends Serializable, OT> extends Thread {
@@ -190,7 +219,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		private final TypeSerializer<OT> serializer;
 
 		private final Object checkpointLock;
-		private final TimestampedCollector<OT> collector;
+		private final SourceFunction.SourceContext<OT> readerContext;
 
 		private final Queue<FileInputSplit> pendingSplits;
 
@@ -202,16 +231,16 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 		private SplitReader(FileInputFormat<OT> format,
 					TypeSerializer<OT> serializer,
-					TimestampedCollector<OT> collector,
+					SourceFunction.SourceContext<OT> readerContext,
 					Object checkpointLock,
 					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
 
 			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
 			this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
+			this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
+			this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
 
-			this.pendingSplits = new LinkedList<>();
-			this.collector = collector;
-			this.checkpointLock = checkpointLock;
+			this.pendingSplits = new ArrayDeque<>();
 			this.isRunning = true;
 
 			// this is the case where a task recovers from a previous failed attempt
@@ -221,7 +250,6 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 				S formatState = restoredState.f2;
 
 				for (FileInputSplit split : pending) {
-					Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 					pendingSplits.add(split);
 				}
 
@@ -231,9 +259,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		}
 
 		private void addSplit(FileInputSplit split) {
-			Preconditions.checkNotNull(split);
+			checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
 			synchronized (checkpointLock) {
-				Preconditions.checkArgument(!pendingSplits.contains(split), "Duplicate split entry to read: " + split + ".");
 				this.pendingSplits.add(split);
 			}
 		}
@@ -269,7 +296,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 								checkpointableFormat.reopen(currentSplit, restoredFormatState);
 							} else {
 								// this is the case of a non-checkpointable input format that will reprocess the last split.
-								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+								LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
 								format.open(currentSplit);
 							}
 							// reset the restored state to null for the next iteration
@@ -301,7 +328,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							synchronized (checkpointLock) {
 								nextElement = format.nextRecord(nextElement);
 								if (nextElement != null) {
-									collector.collect(nextElement);
+									readerContext.collect(nextElement);
 								} else {
 									break;
 								}
@@ -360,7 +387,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 							restoredFormatState;
 					return new Tuple3<>(snapshot, currentSplit, formatState);
 				} else {
-					LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+					LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
 					return new Tuple3<>(snapshot, currentSplit, null);
 				}
 			} else {
@@ -431,8 +458,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		S formatState = (S) ois.readObject();
 
 		// set the whole reader state for the open() to find.
-		Preconditions.checkState(this.readerState == null,
-			"The reader state has already been initialized.");
+		checkState(this.readerState == null, "The reader state has already been initialized.");
 
 		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
 		div.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
new file mode 100644
index 0000000..12018a7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AsyncExceptionChecker.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * An interface used by sources that may throw exceptions
+ * asynchronously like the {@link StreamSource}.
+ */
+public interface AsyncExceptionChecker {
+
+	/** Checks if an asynchronous exception was thrown. */
+	void checkAsyncException();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 38c948b..e914240 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledFuture;
  */
 @Internal
 public class StreamSource<OUT, SRC extends SourceFunction<OUT>> 
-		extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
+		extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT>, AsyncExceptionChecker {
 
 	private static final long serialVersionUID = 1L;
 	
@@ -70,7 +70,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 				ctx = new NonTimestampContext<>(this, lockingObject, collector);
 				break;
 			default:
-				throw new Exception(String.valueOf(timeCharacteristic));
+				throw new Exception("Invalid time characteristic: " + String.valueOf(timeCharacteristic));
 		}
 
 		// copy to a field to give the 'cancel()' method access
@@ -127,7 +127,8 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	 * has caused an exception. If one of these threads caused an exception, this method will
 	 * throw that exception.
 	 */
-	void checkAsyncException() {
+	@Override
+	public void checkAsyncException() {
 		getContainingTask().checkTimerException();
 	}
 
@@ -141,12 +142,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	 */
 	public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
 
-		private final StreamSource<?, ?> owner;
+		private final AsyncExceptionChecker owner;
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
 
-		public NonTimestampContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
+		public NonTimestampContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) {
 			this.owner = owner;
 			this.lockingObject = lockingObject;
 			this.output = output;
@@ -188,18 +189,19 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	 */
 	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
-		private final StreamSource<?, ?> owner;
+		private final AbstractStreamOperator<T> owner;
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
-		
+		private final AsyncExceptionChecker source;
+
 		private final ScheduledFuture<?> watermarkTimer;
 		private final long watermarkInterval;
 
 		private volatile long nextWatermarkTime;
 
 		public AutomaticWatermarkContext(
-				final StreamSource<?, ?> owner,
+				final AbstractStreamOperator<T> owner,
 				final Object lockingObjectParam,
 				final Output<StreamRecord<T>> outputParam,
 				final long watermarkInterval) {
@@ -214,6 +216,16 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 			this.watermarkInterval = watermarkInterval;
 			this.reuse = new StreamRecord<T>(null);
 
+			// if it is a source, then we cast and cache it
+			// here so that we do not have to do it in every collect(),
+			// collectWithTimestamp() and emitWatermark()
+
+			if (!(owner instanceof AsyncExceptionChecker)) {
+				throw new IllegalStateException("The AutomaticWatermarkContext can only be used " +
+					"with sources that implement the AsyncExceptionChecker interface.");
+			}
+			this.source = (AsyncExceptionChecker) owner;
+
 			long now = owner.getCurrentProcessingTime();
 			this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
 				new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
@@ -221,7 +233,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		@Override
 		public void collect(T element) {
-			owner.checkAsyncException();
+			source.checkAsyncException();
 			
 			synchronized (lockingObject) {
 				final long currentTime = owner.getCurrentProcessingTime();
@@ -250,7 +262,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			owner.checkAsyncException();
+			source.checkAsyncException();
 			
 			if (mark.getTimestamp() == Long.MAX_VALUE) {
 				// allow it since this is the special end-watermark that for example the Kafka source emits
@@ -260,7 +272,9 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 				}
 
 				// we can shutdown the timer now, no watermarks will be needed any more
-				watermarkTimer.cancel(true);
+				if (watermarkTimer != null) {
+					watermarkTimer.cancel(true);
+				}
 			}
 		}
 
@@ -271,16 +285,18 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 		@Override
 		public void close() {
-			watermarkTimer.cancel(true);
+			if (watermarkTimer != null) {
+				watermarkTimer.cancel(true);
+			}
 		}
 
 		private class WatermarkEmittingTask implements Triggerable {
 
-			private final StreamSource<?, ?> owner;
+			private final AbstractStreamOperator<T> owner;
 			private final Object lockingObject;
 			private final Output<StreamRecord<T>> output;
 
-			private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
+			private WatermarkEmittingTask(AbstractStreamOperator<T> src, Object lock, Output<StreamRecord<T>> output) {
 				this.owner = src;
 				this.lockingObject = lock;
 				this.output = output;
@@ -299,7 +315,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 					synchronized (lockingObject) {
 						if (currentTime > nextWatermarkTime) {
 							output.emitWatermark(new Watermark(watermarkTime));
-							nextWatermarkTime += watermarkInterval;
+							nextWatermarkTime = watermarkTime + watermarkInterval;
 						}
 					}
 				}
@@ -320,12 +336,12 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	 */
 	public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
-		private final StreamSource<?, ?> owner;
+		private final AsyncExceptionChecker owner;
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
 
-		public ManualWatermarkContext(StreamSource<?, ?> owner, Object lockingObject, Output<StreamRecord<T>> output) {
+		public ManualWatermarkContext(AsyncExceptionChecker owner, Object lockingObject, Output<StreamRecord<T>> output) {
 			this.owner = owner;
 			this.lockingObject = lockingObject;
 			this.output = output;
@@ -335,7 +351,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		@Override
 		public void collect(T element) {
 			owner.checkAsyncException();
-			
 			synchronized (lockingObject) {
 				output.collect(reuse.replace(element));
 			}
@@ -344,7 +359,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		@Override
 		public void collectWithTimestamp(T element, long timestamp) {
 			owner.checkAsyncException();
-			
 			synchronized (lockingObject) {
 				output.collect(reuse.replace(element, timestamp));
 			}
@@ -353,7 +367,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		@Override
 		public void emitWatermark(Watermark mark) {
 			owner.checkAsyncException();
-			
 			synchronized (lockingObject) {
 				output.emitWatermark(mark);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/bab59dfa/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 40086c5..5708639 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -156,6 +157,14 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}).when(mockTask).getCurrentProcessingTime();
 	}
 
+	public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+		this.config.setTimeCharacteristic(timeCharacteristic);
+	}
+
+	public TimeCharacteristic getTimeCharacteristic() {
+		return this.config.getTimeCharacteristic();
+	}
+
 	public void setStateBackend(AbstractStateBackend stateBackend) {
 		this.stateBackend = stateBackend;
 	}