You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/14 16:12:35 UTC

[1/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

Repository: flink
Updated Branches:
  refs/heads/master fdf436099 -> d353895ba


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 530bae9..67c05e5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -23,8 +23,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -89,7 +91,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 	 * followed by the checks in {@link #postSubmit}.
 	 */
 	@Test
-	public void runCheckpointedProgram() {
+	public void runCheckpointedProgram() throws Exception {
 		try {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 					"localhost", cluster.getLeaderRPCPort());
@@ -99,13 +101,13 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 
 			testProgram(env);
 
-			env.execute();
+			TestUtils.tryExecute(env, "Fault Tolerance Test");
 
 			postSubmit();
 		}
 		catch (Exception e) {
 			e.printStackTrace();
-			fail(e.getMessage());
+			Assert.fail(e.getMessage());
 		}
 	}
 


[3/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

Posted by al...@apache.org.
[FLINK-2314] Make Streaming File Sources Persistent

This commit is a combination of several commits/changes. It combines
changes to the file input formats and the streaming file read operator
and integrates them into the API.

These are the messages of the other two commits:

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353895b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353895b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353895b

Branch: refs/heads/master
Commit: d353895ba512a5e30fb08a25643fd93f085e8456
Parents: bc19486
Author: kl0u <kk...@gmail.com>
Authored: Sun Apr 10 16:56:42 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      |  85 +++-
 .../io/avro/AvroSplittableInputFormatTest.java  |  95 +++-
 .../flink/api/common/io/BinaryInputFormat.java  | 121 +++--
 .../apache/flink/api/common/io/BlockInfo.java   |   5 +
 .../common/io/CheckpointableInputFormat.java    |  61 +++
 .../api/common/io/DelimitedInputFormat.java     | 114 +++--
 .../api/common/io/EnumerateNestedFilesTest.java |   2 +-
 .../api/common/io/FileInputFormatTest.java      |   9 +-
 .../api/common/io/SequentialFormatTestBase.java |  15 +-
 flink-fs-tests/pom.xml                          |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java | 300 +++++++++++++
 .../hdfstests/ContinuousFileMonitoringTest.java | 447 +++++++++++++++++++
 .../flink/api/java/io/CsvInputFormatTest.java   | 134 +++++-
 .../runtime/taskmanager/RuntimeEnvironment.java |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../api/datastream/DataStreamSource.java        |   5 +
 .../environment/StreamExecutionEnvironment.java | 256 +++++++++--
 .../ContinuousFileMonitoringFunction.java       | 328 ++++++++++++++
 .../source/ContinuousFileReaderOperator.java    | 390 ++++++++++++++++
 .../source/FileMonitoringFunction.java          |   3 +-
 .../api/functions/source/FilePathFilter.java    |  66 +++
 .../functions/source/FileProcessingMode.java    |  31 ++
 .../api/functions/source/FileReadFunction.java  |   3 +-
 .../functions/source/FileSourceFunction.java    | 148 ------
 .../api/functions/source/InputFormatSource.java | 148 ++++++
 .../api/graph/StreamGraphGenerator.java         |   6 +-
 .../api/operators/OutputTypeConfigurable.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +
 .../api/scala/StreamExecutionEnvironment.scala  |  74 ++-
 ...ontinuousFileProcessingCheckpointITCase.java | 327 ++++++++++++++
 .../StreamFaultToleranceTestBase.java           |   8 +-
 32 files changed, 2889 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 605ce69..a920275 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -22,13 +22,15 @@ package org.apache.flink.api.java.io;
 import java.io.IOException;
 
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
@@ -42,15 +44,16 @@ import org.apache.flink.util.InstantiationUtil;
 
 /**
  * Provides a {@link FileInputFormat} for Avro records.
- * 
+ *
  * @param <E>
  *            the type of the result Avro record. If you specify
  *            {@link GenericRecord} then the result will be returned as a
  *            {@link GenericRecord}, so you do not have to know the schema ahead
  *            of time.
  */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
-	
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+	CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
@@ -59,16 +62,19 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	
 	private boolean reuseAvroValue = true;
 
-	private transient FileReader<E> dataFileReader;
+	private transient DataFileReader<E> dataFileReader;
 
 	private transient long end;
-	
+
+	private transient long recordsReadSinceLastSync;
+
+	private transient long lastSync = -1l;
+
 	public AvroInputFormat(Path filePath, Class<E> type) {
 		super(filePath);
 		this.avroValueType = type;
 	}
-	
-	
+
 	/**
 	 * Sets the flag whether to reuse the Avro value instance for all records.
 	 * By default, the input format reuses the Avro value.
@@ -102,30 +108,34 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	@Override
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
+		dataFileReader = initReader(split);
+		dataFileReader.sync(split.getStart());
+		lastSync = dataFileReader.previousSync();
+	}
 
+	private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
 		DatumReader<E> datumReader;
-		
+
 		if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
 			datumReader = new GenericDatumReader<E>();
 		} else {
 			datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
-					? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+				? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
 		}
-
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Opening split {}", split);
 		}
 
 		SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+		DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
 
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
 		}
-		
-		dataFileReader.sync(split.getStart());
-		this.end = split.getStart() + split.getLength();
+
+		end = split.getStart() + split.getLength();
+		recordsReadSinceLastSync = 0;
+		return dataFileReader;
 	}
 
 	@Override
@@ -133,11 +143,24 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 		return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
 	}
 
+	public long getRecordsReadFromBlock() {
+		return this.recordsReadSinceLastSync;
+	}
+
 	@Override
 	public E nextRecord(E reuseValue) throws IOException {
 		if (reachedEnd()) {
 			return null;
 		}
+
+		// if we start a new block, then register the event, and
+		// restart the counter.
+		if(dataFileReader.previousSync() != lastSync) {
+			lastSync = dataFileReader.previousSync();
+			recordsReadSinceLastSync = 0;
+		}
+		recordsReadSinceLastSync++;
+
 		if (reuseAvroValue) {
 			return dataFileReader.next(reuseValue);
 		} else {
@@ -148,4 +171,34 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 			}
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		if (state.f0 != -1) {
+
+			// go to the block we stopped
+			lastSync = state.f0;
+			dataFileReader.seek(lastSync);
+
+			// read until the record we were before the checkpoint and discard the values
+			long recordsToDiscard = state.f1;
+			for(int i = 0; i < recordsToDiscard; i++) {
+				dataFileReader.next(null);
+				recordsReadSinceLastSync++;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
index 898b8fd..37a83d1 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.io.avro.generated.Colors;
 import org.apache.flink.api.io.avro.generated.Fixed16;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -166,7 +167,7 @@ public class AvroSplittableInputFormatTest {
 		Configuration parameters = new Configuration();
 		
 		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		
+
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(4);
 		assertEquals(splits.length, 4);
@@ -191,6 +192,98 @@ public class AvroSplittableInputFormatTest {
 		format.close();
 	}
 
+	@Test
+	public void testAvroRecoveryWithFailureAtStart() throws Exception {
+		final int recordsUntilCheckpoint = 132;
+
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		format.configure(parameters);
+
+		FileInputSplit[] splits = format.createInputSplits(4);
+		assertEquals(splits.length, 4);
+
+		int elements = 0;
+		int elementsPerSplit[] = new int[4];
+		for(int i = 0; i < splits.length; i++) {
+			format.reopen(splits[i], format.getCurrentState());
+			while(!format.reachedEnd()) {
+				User u = format.nextRecord(null);
+				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+				elements++;
+
+				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+
+					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
+					Tuple2<Long, Long> state = format.getCurrentState();
+
+					// this is to make sure that nothing stays from the previous format
+					// (as it is going to be in the normal case)
+					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
+
+					format.reopen(splits[i], state);
+					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+				}
+				elementsPerSplit[i]++;
+			}
+			format.close();
+		}
+
+		Assert.assertEquals(1539, elementsPerSplit[0]);
+		Assert.assertEquals(1026, elementsPerSplit[1]);
+		Assert.assertEquals(1539, elementsPerSplit[2]);
+		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(NUM_RECORDS, elements);
+		format.close();
+	}
+
+	@Test
+	public void testAvroRecovery() throws Exception {
+		final int recordsUntilCheckpoint = 132;
+
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		format.configure(parameters);
+
+		FileInputSplit[] splits = format.createInputSplits(4);
+		assertEquals(splits.length, 4);
+
+		int elements = 0;
+		int elementsPerSplit[] = new int[4];
+		for(int i = 0; i < splits.length; i++) {
+			format.open(splits[i]);
+			while(!format.reachedEnd()) {
+				User u = format.nextRecord(null);
+				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+				elements++;
+
+				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+
+					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
+					Tuple2<Long, Long> state = format.getCurrentState();
+
+					// this is to make sure that nothing stays from the previous format
+					// (as it is going to be in the normal case)
+					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
+
+					format.reopen(splits[i], state);
+					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+				}
+				elementsPerSplit[i]++;
+			}
+			format.close();
+		}
+
+		Assert.assertEquals(1539, elementsPerSplit[0]);
+		Assert.assertEquals(1026, elementsPerSplit[1]);
+		Assert.assertEquals(1539, elementsPerSplit[2]);
+		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(NUM_RECORDS, elements);
+		format.close();
+	}
+
 	/*
 	This test is gave the reference values for the test of Flink's IF.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 3789544..eb83bda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -30,6 +31,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,36 +42,44 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
- * configuration, these block sizes equal the native block sizes of the HDFS.
+ * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks,
+ * meaning that each split will consist of one block. Without configuration, these block sizes equal the native
+ * block sizes of the HDFS.
+ *
+ * A block will contain a {@link BlockInfo} at the end of the block. There, the reader can find some statistics
+ * about the split currently being read, that will help correctly parse the contents of the block.
  */
 @Public
-public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
+public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
+	implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The log.
-	 */
+	/** The log. */
 	private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class);
 
-	/**
-	 * The config parameter which defines the fixed length of a record.
-	 */
+	/** The config parameter which defines the fixed length of a record. */
 	public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size";
 
 	public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
-	/**
-	 * The block size to use.
-	 */
+	/** The block size to use. */
 	private long blockSize = NATIVE_BLOCK_SIZE;
 
 	private transient DataInputViewStreamWrapper dataInputStream;
 
+	/** The BlockInfo for the Block corresponding to the split currently being read. */
 	private transient BlockInfo blockInfo;
 
-	private long readRecords;
+	/** A wrapper around the block currently being read. */
+	private transient BlockBasedInput blockBasedInput = null;
 
+	/**
+	 * The number of records already read from the block.
+	 * This is used to decide if the end of the block has been
+	 * reached.
+	 */
+	private long readRecords = 0;
 
 	@Override
 	public void configure(Configuration parameters) {
@@ -193,6 +203,20 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 		return new BlockInfo();
 	}
 
+	private BlockInfo createAndReadBlockInfo() throws IOException {
+		BlockInfo blockInfo = new BlockInfo();
+		if (this.splitLength > blockInfo.getInfoSize()) {
+			// At first we go and read  the block info containing the recordCount, the accumulatedRecordCount
+			// and the firstRecordStart offset in the current block. This is written at the end of the block and
+			// is of fixed size, currently 3 * Long.SIZE.
+
+			// TODO: seek not supported by compressed streams. Will throw exception
+			this.stream.seek(this.splitStart + this.splitLength - blockInfo.getInfoSize());
+			blockInfo.read(new DataInputViewStreamWrapper(this.stream));
+		}
+		return blockInfo;
+	}
+
 	/**
 	 * Fill in the statistics. The last modification time and the total input size are prefilled.
 	 *
@@ -207,8 +231,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return null;
 		}
 
-		BlockInfo blockInfo = this.createBlockInfo();
-
+		BlockInfo blockInfo = new BlockInfo();
 		long totalCount = 0;
 		for (FileStatus file : files) {
 			// invalid file
@@ -249,20 +272,16 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
 
-		final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
-			this.filePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
-
-		this.blockInfo = this.createBlockInfo();
-		if (this.splitLength > this.blockInfo.getInfoSize()) {
-			// TODO: seek not supported by compressed streams. Will throw exception
-			this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
-			this.blockInfo.read(new DataInputViewStreamWrapper(this.stream));
-		}
+		this.blockInfo = this.createAndReadBlockInfo();
 
+		// We set the size of the BlockBasedInput to splitLength as each split contains one block.
+		// After reading the block info, we seek in the file to the correct position.
+		
+		this.readRecords = 0;
 		this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
-		BlockBasedInput blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
+		this.blockBasedInput = new BlockBasedInput(this.stream,
+			(int) blockInfo.getFirstRecordStart(), this.splitLength);
 		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
-		this.readRecords = 0;
 	}
 
 	@Override
@@ -275,7 +294,6 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 		if (this.reachedEnd()) {
 			return null;
 		}
-
 		record = this.deserialize(record, this.dataInputStream);
 		this.readRecords++;
 		return record;
@@ -284,8 +302,8 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	protected abstract T deserialize(T reuse, DataInputView dataInput) throws IOException;
 
 	/**
-	 * Writes a block info at the end of the blocks.<br>
-	 * Current implementation uses only int and not long.
+	 * Reads the content of a block of data. The block contains its {@link BlockInfo}
+	 * at the end, and this method takes this into account when reading the data.
 	 */
 	protected class BlockBasedInput extends FilterInputStream {
 		private final int maxPayloadSize;
@@ -297,6 +315,12 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			this.blockPos = (int) BinaryInputFormat.this.blockInfo.getFirstRecordStart();
 			this.maxPayloadSize = blockSize - BinaryInputFormat.this.blockInfo.getInfoSize();
 		}
+		
+		public BlockBasedInput(FSDataInputStream in, int startPos, long length) {
+			super(in);
+			this.blockPos = startPos;
+			this.maxPayloadSize = (int) (length - BinaryInputFormat.this.blockInfo.getInfoSize());
+		}
 
 		@Override
 		public int read() throws IOException {
@@ -306,9 +330,16 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return this.in.read();
 		}
 
+		private long getCurrBlockPos() {
+			return this.blockPos;
+		}
+
 		private void skipHeader() throws IOException {
 			byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()];
 			this.in.read(dummy, 0, dummy.length);
+
+			// the blockPos is set to 0 for the case of remote reads,
+			// these are the cases where the last record of a block spills on the next block
 			this.blockPos = 0;
 		}
 
@@ -337,4 +368,36 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return totalRead;
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		if (this.blockBasedInput == null) {
+			throw new RuntimeException("You must have forgotten to call open() on your input format.");
+		}
+
+		return  new Tuple2<>(
+			this.blockBasedInput.getCurrBlockPos(), 		// the last read index in the block
+			this.readRecords								// the number of records read
+		);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		this.blockInfo = this.createAndReadBlockInfo();
+
+		long blockPos = state.f0;
+		this.readRecords = state.f1;
+
+		this.stream.seek(this.splitStart + blockPos);
+		this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
+		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
index 0ac2e50..2cb18ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
@@ -25,6 +25,11 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+/**
+ * A block of 24 bytes written at the <i>end</i> of a block in a binary file, and containing
+ * i) the number of records in the block, ii) the accumulated number of records, and
+ * iii) the offset of the first record in the block.
+ * */
 @Public
 public class BlockInfo implements IOReadableWritable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
new file mode 100644
index 0000000..17b0625
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.InputSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An interface the describes {@link InputFormat}s that allow checkpointing/restoring their state.
+ *
+ * @param <S> The type of input split.
+ * @param <T> The type of the channel state to be checkpointed / included in the snapshot.
+ */
+@PublicEvolving
+public interface CheckpointableInputFormat<S extends InputSplit, T extends Serializable> {
+
+	/**
+	 * Returns the split currently being read, along with its current state.
+	 * This will be used to restore the state of the reading channel when recovering from a task failure.
+	 * In the case of a simple text file, the state can correspond to the last read offset in the split.
+	 *
+	 * @return The state of the channel.
+	 *
+	 * @throws Exception Thrown if the creation of the state object failed.
+	 */
+	T getCurrentState() throws IOException;
+
+	/**
+	 * Restores the state of a parallel instance reading from an {@link InputFormat}.
+	 * This is necessary when recovering from a task failure. When this method is called,
+	 * the input format it guaranteed to be configured.
+	 *
+	 * <p/>
+	 * <b>NOTE: </b> The caller has to make sure that the provided split is the one to whom
+	 * the state belongs.
+	 *
+	 * @param split The split to be opened.
+	 * @param state The state from which to start from. This can contain the offset,
+	 *                 but also other data, depending on the input format.
+	 */
+	void reopen(S split, T state) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 243e2a4..3a77200 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -19,6 +19,9 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,9 +31,6 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ import java.util.ArrayList;
  * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 @Public
-public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
+public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements CheckpointableInputFormat<FileInputSplit, Long> {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -56,7 +56,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 
 	/** The default charset  to convert strings to bytes */
 	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-	
+
 	/**
 	 * The default read buffer size = 1MB.
 	 */
@@ -81,7 +81,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	 * The maximum size of a sample record before sampling is aborted. To catch cases where a wrong delimiter is given.
 	 */
 	private static int MAX_SAMPLE_LEN;
-	
+
 	static { loadGlobalConfigParams(); }
 	
 	protected static void loadGlobalConfigParams() {
@@ -135,7 +135,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	private transient int readPos;
 
 	private transient int limit;
-	
+
 	private transient byte[] currBuffer;		// buffer in which current record byte sequence is found
 	private transient int currOffset;			// offset in above buffer
 	private transient int currLen;				// length of current byte sequence
@@ -143,8 +143,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	private transient boolean overLimit;
 
 	private transient boolean end;
-	
-	
+
+	private transient long offset = -1;
+
 	// --------------------------------------------------------------------------------------------
 	//  The configuration parameters. Configured on the instance and serialized to be shipped.
 	// --------------------------------------------------------------------------------------------
@@ -182,7 +183,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		
 		this.delimiter = delimiter;
 	}
-	
+
 	public void setDelimiter(char delimiter) {
 		setDelimiter(String.valueOf(delimiter));
 	}
@@ -226,7 +227,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		
 		this.numLineSamples = numLineSamples;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  User-defined behavior
 	// --------------------------------------------------------------------------------------------
@@ -404,17 +405,33 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	/**
 	 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
 	 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
-	 * 
+	 *
 	 * @param split The input split to open.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
 	 */
 	@Override
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
-		
+		initBuffers();
+
+		this.offset = splitStart;
+		if (this.splitStart != 0) {
+			this.stream.seek(offset);
+			readLine();
+			// if the first partial record already pushes the stream over
+			// the limit of our split, then no record starts within this split
+			if (this.overLimit) {
+				this.end = true;
+			}
+		} else {
+			fillBuffer();
+		}
+	}
+
+	private void initBuffers() {
 		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
-		
+
 		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
 			this.readBuffer = new byte[this.bufferSize];
 		}
@@ -426,19 +443,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		this.limit = 0;
 		this.overLimit = false;
 		this.end = false;
-
-		if (this.splitStart != 0) {
-			this.stream.seek(this.splitStart);
-			readLine();
-			
-			// if the first partial record already pushes the stream over the limit of our split, then no
-			// record starts within this split 
-			if (this.overLimit) {
-				this.end = true;
-			}
-		} else {
-			fillBuffer();
-		}
 	}
 
 	/**
@@ -489,6 +493,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 			if (this.readPos >= this.limit) {
 				if (!fillBuffer()) {
 					if (countInWrapBuffer > 0) {
+						this.offset += countInWrapBuffer;
 						setResult(this.wrapBuffer, 0, countInWrapBuffer);
 						return true;
 					} else {
@@ -506,13 +511,14 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 				} else {
 					i = 0;
 				}
-
 			}
 
 			// check why we dropped out
 			if (i == this.delimiter.length) {
 				// line end
-				count = this.readPos - startPos - this.delimiter.length;
+				int totalBytesRead = this.readPos - startPos;
+				this.offset += countInWrapBuffer + totalBytesRead;
+				count = totalBytesRead - this.delimiter.length;
 
 				// copy to byte array
 				if (countInWrapBuffer > 0) {
@@ -535,7 +541,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 				count = this.limit - startPos;
 				
 				// check against the maximum record length
-				if ( ((long) countInWrapBuffer) + count > this.lineLengthLimit) {
+				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
 					throw new IOException("The record length exceeded the maximum record length (" + 
 							this.lineLengthLimit + ").");
 				}
@@ -604,13 +610,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 			return true;
 		}
 	}
-	
-	// ============================================================================================
-	//  Parametrization via configuration
-	// ============================================================================================
-	
-	// ------------------------------------- Config Keys ------------------------------------------
-	
+
+	// --------------------------------------------------------------------------------------------
+	// Config Keys for Parametrization via configuration
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * The configuration key to set the record delimiter.
 	 */
@@ -620,4 +624,38 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	 * The configuration key to set the number of samples to take for the statistics.
 	 */
 	private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples";
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Long getCurrentState() throws IOException {
+		return this.offset;
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Long state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		this.offset = state;
+		if (state > this.splitStart + split.getLength()) {
+			this.end = true;
+		} else if (state > split.getStart()) {
+			initBuffers();
+
+			this.stream.seek(this.offset);
+			if (split.getLength() == -1) {
+				// this is the case for unsplittable files
+				fillBuffer();
+			} else {
+				this.splitLength = this.splitStart + split.getLength() - this.offset;
+				if (splitLength <= 0) {
+					this.end = true;
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index d0caa22..68465a3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -394,4 +394,4 @@ public class EnumerateNestedFilesTest {
 			return null;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 63cb966..ae8802b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -377,7 +377,7 @@ public class FileInputFormatTest {
 		final int numBlocks = 3;
 		FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
 		for (int i = 0; i < blockSize * numBlocks; i++) {
-			fileOutputStream.write(new byte[]{1});
+			fileOutputStream.write(new byte[]{(byte) i});
 		}
 		fileOutputStream.close();
 
@@ -392,11 +392,12 @@ public class FileInputFormatTest {
 		FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
 
 		byte[] bytes = null;
+		byte prev = 0;
 		for (FileInputSplit inputSplit : inputSplits) {
 			inputFormat.open(inputSplit);
 			while (!inputFormat.reachedEnd()) {
 				if ((bytes = inputFormat.nextRecord(bytes)) != null) {
-					Assert.assertArrayEquals(new byte[]{(byte) 0xFE}, bytes);
+					Assert.assertArrayEquals(new byte[]{--prev}, bytes);
 				}
 			}
 		}
@@ -420,14 +421,13 @@ public class FileInputFormatTest {
 		}
 	}
 
-
 	private static final class MyDecoratedInputFormat extends FileInputFormat<byte[]> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public boolean reachedEnd() throws IOException {
-			return this.splitLength <= this.stream.getPos();
+			return this.stream.getPos() >= this.splitStart + this.splitLength;
 		}
 
 		@Override
@@ -442,7 +442,6 @@ public class FileInputFormatTest {
 			inputStream = super.decorateInputStream(inputStream, fileSplit);
 			return new InputStreamFSInputWrapper(new InvertedInputStream(inputStream));
 		}
-
 	}
 
 	private static final class InvertedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 2ff6fab..b00ca95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -127,17 +128,29 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 	 * Tests if the expected sequence and amount of data can be read
 	 */
 	@Test
-	public void checkRead() throws IOException {
+	public void checkRead() throws Exception {
 		BinaryInputFormat<T> input = this.createInputFormat();
 		FileInputSplit[] inputSplits = input.createInputSplits(0);
 		Arrays.sort(inputSplits, new InputSplitSorter());
+
 		int readCount = 0;
+
 		for (FileInputSplit inputSplit : inputSplits) {
 			input.open(inputSplit);
+			input.reopen(inputSplit, input.getCurrentState());
+
 			T record = createInstance();
+
 			while (!input.reachedEnd()) {
 				if (input.nextRecord(record) != null) {
 					this.checkEquals(this.getRecord(readCount), record);
+
+					if (!input.reachedEnd()) {
+						Tuple2<Long, Long> state = input.getCurrentState();
+
+						input = this.createInputFormat();
+						input.reopen(inputSplit, state);
+					}
 					readCount++;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index 2c9500f..53cb503 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -78,7 +78,15 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-		
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
@@ -94,5 +102,6 @@ under the License.
 			<type>test-jar</type>
 			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
 		</dependency>
+
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
new file mode 100644
index 0000000..e6cd5d9
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTestBase {
+
+	private static final int NO_OF_FILES = 10;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private File baseDir;
+
+	private org.apache.hadoop.fs.FileSystem hdfs;
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+
+	private static Map<Integer, String> expectedContents = new HashMap<>();
+
+	//						PREPARING FOR THE TESTS
+
+	@Before
+	public void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		/*
+		* This test checks the interplay between the monitor and the reader
+		* and also the failExternally() functionality. To test the latter we
+		* set the parallelism to 1 so that we have the chaining between the sink,
+		* which throws the SuccessException to signal the end of the test, and the
+		* reader.
+		* */
+
+		FileCreator fileCreator = new FileCreator(INTERVAL);
+		Thread t = new Thread(fileCreator);
+		t.start();
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilePath(hdfsURI);
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(1);
+
+			ContinuousFileMonitoringFunction<String> monitoringFunction =
+				new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+					FilePathFilter.createDefaultFilter(),
+					FileProcessingMode.PROCESS_CONTINUOUSLY,
+					env.getParallelism(), INTERVAL);
+
+			TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+			ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+			TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+
+			DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
+			splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
+			env.execute();
+
+		} catch (Exception e) {
+			Throwable th = e;
+			int depth = 0;
+
+			for (; depth < 20; depth++) {
+				if (th instanceof SuccessException) {
+					try {
+						postSubmit();
+					} catch (Exception e1) {
+						e1.printStackTrace();
+					}
+					return;
+				} else if (th.getCause() != null) {
+					th = th.getCause();
+				} else {
+					break;
+				}
+			}
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	private static class TestingSinkFunction extends RichSinkFunction<String> {
+
+		private final ContinuousFileMonitoringFunction src;
+
+		private int elementCounter = 0;
+		private Map<Integer, Integer> elementCounters = new HashMap<>();
+		private Map<Integer, List<String>> collectedContent = new HashMap<>();
+
+		TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
+			this.src = monitoringFunction;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() {
+			// check if the data that we collected are the ones they are supposed to be.
+
+			Assert.assertEquals(collectedContent.size(), expectedContents.size());
+			for (Integer fileIdx: expectedContents.keySet()) {
+				Assert.assertTrue(collectedContent.keySet().contains(fileIdx));
+
+				List<String> cntnt = collectedContent.get(fileIdx);
+				Collections.sort(cntnt, new Comparator<String>() {
+					@Override
+					public int compare(String o1, String o2) {
+						return getLineNo(o1) - getLineNo(o2);
+					}
+				});
+
+				StringBuilder cntntStr = new StringBuilder();
+				for (String line: cntnt) {
+					cntntStr.append(line);
+				}
+				Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
+			}
+			expectedContents.clear();
+
+			src.cancel();
+			try {
+				src.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		private int getLineNo(String line) {
+			String[] tkns = line.split("\\s");
+			Assert.assertTrue(tkns.length == 6);
+			return Integer.parseInt(tkns[tkns.length - 1]);
+		}
+
+		@Override
+		public void invoke(String value) throws Exception {
+			int fileIdx = Character.getNumericValue(value.charAt(0));
+
+			Integer counter = elementCounters.get(fileIdx);
+			if (counter == null) {
+				counter = 0;
+			} else if (counter == LINES_PER_FILE) {
+				// ignore duplicate lines.
+				Assert.fail("Duplicate lines detected.");
+			}
+			elementCounters.put(fileIdx, ++counter);
+
+			List<String> content = collectedContent.get(fileIdx);
+			if (content == null) {
+				content = new ArrayList<>();
+				collectedContent.put(fileIdx, content);
+			}
+			content.add(value + "\n");
+
+			elementCounter++;
+			if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
+				throw new SuccessException();
+			}
+		}
+	}
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator implements Runnable {
+
+		private final long interval;
+
+		FileCreator(long interval) {
+			this.interval = interval;
+		}
+
+		public void run() {
+			try {
+				for (int i = 0; i < NO_OF_FILES; i++) {
+					fillWithData(hdfsURI, "file", i, "This is test line.");
+					Thread.sleep(interval);
+				}
+			} catch (IOException e) {
+				e.printStackTrace();
+			} catch (InterruptedException e) {
+				// we just close without any message.
+			}
+		}
+	}
+
+	/**
+	 * Fill the file with content.
+	 * */
+	private void fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		expectedContents.put(fileIdx, str.toString());
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+	}
+
+	public static class SuccessException extends Exception {
+		private static final long serialVersionUID = -7011865671593955887L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
new file mode 100644
index 0000000..87567e3
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class ContinuousFileMonitoringTest {
+
+	private static final int NO_OF_FILES = 10;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private static File baseDir;
+
+	private static org.apache.hadoop.fs.FileSystem hdfs;
+	private static String hdfsURI;
+	private static MiniDFSCluster hdfsCluster;
+
+	//						PREPARING FOR THE TESTS
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	//						TESTS
+
+	@Test
+	public void testFileReadingOperator() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+		for(int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader);
+
+		reader.setOutputType(typeInfo, new ExecutionConfig());
+		tester.open();
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		for(FileInputSplit split: splits) {
+			tester.processElement(new StreamRecord<>(split));
+		}
+
+		// then close the reader gracefully
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		/*
+		* Given that the reader is multithreaded, the test finishes before the reader thread finishes
+		* reading. This results in files being deleted by the test before being read, thus throwing an exception.
+		* In addition, even if file deletion happens at the end, the results are not ready for testing.
+		* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
+		*/
+
+		long start = System.currentTimeMillis();
+		Queue<Object> output;
+		do {
+			output = tester.getOutput();
+			Thread.sleep(50);
+		} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+		for(Object line: tester.getOutput()) {
+			StreamRecord<String> element = (StreamRecord<String>) line;
+
+			int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+			List<String> content = actualFileContents.get(fileIdx);
+			if(content == null) {
+				content = new ArrayList<>();
+				actualFileContents.put(fileIdx, content);
+			}
+			content.add(element.getValue() +"\n");
+		}
+
+		Assert.assertEquals(actualFileContents.size(), expectedFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	private static class PathFilter extends FilePathFilter {
+
+		@Override
+		public boolean filterPath(Path filePath) {
+			return filePath.getName().startsWith("**");
+		}
+	}
+
+	@Test
+	public void testFilePathFiltering() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+
+		// create the files to be discarded
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "**file", i, "This is test line.");
+			filesCreated.add(file.f0);
+		}
+
+		// create the files to be kept
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(),
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+		for(int i = 0; i < NO_OF_FILES; i++) {
+			org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
+			Assert.assertTrue(uniqFilesFound.contains(file.toString()));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	@Test
+	public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+
+		FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
+		fc.start();
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		// wait until the sink also sees all the splits.
+		synchronized (uniqFilesFound) {
+			while (uniqFilesFound.size() < NO_OF_FILES) {
+				uniqFilesFound.wait(7 * INTERVAL);
+			}
+		}
+
+		Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
+		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+
+		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
+		Set<String> fileNamesCreated = new HashSet<>();
+		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+			fileNamesCreated.add(path.toString());
+		}
+
+		for(String file: uniqFilesFound) {
+			Assert.assertTrue(fileNamesCreated.contains(file));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	@Test
+	public void testFileSplitMonitoringProcessOnce() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+
+		FileCreator fc = new FileCreator(INTERVAL, 1);
+		fc.start();
+
+		// to make sure that at least one file is created
+		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
+		synchronized (filesCreated) {
+			if (filesCreated.size() == 0) {
+				filesCreated.wait();
+			}
+		}
+		Assert.assertTrue(fc.getFilesCreated().size() >= 1);
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		// wait until all the files are created
+		fc.join();
+
+		Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+
+		Set<String> fileNamesCreated = new HashSet<>();
+		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+			fileNamesCreated.add(path.toString());
+		}
+
+		Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size());
+		for(String file: uniqFilesFound) {
+			Assert.assertTrue(fileNamesCreated.contains(file));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	// -------------		End of Tests
+
+	private int getLineNo(String line) {
+		String[] tkns = line.split("\\s");
+		Assert.assertTrue(tkns.length == 6);
+		return Integer.parseInt(tkns[tkns.length - 1]);
+	}
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator extends Thread {
+
+		private final long interval;
+		private final int noOfFilesBeforeNotifying;
+
+		private final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+
+		FileCreator(long interval, int notificationLim) {
+			this.interval = interval;
+			this.noOfFilesBeforeNotifying = notificationLim;
+		}
+
+		public void run() {
+			try {
+				for(int i = 0; i < NO_OF_FILES; i++) {
+					Tuple2<org.apache.hadoop.fs.Path, String> file =
+						fillWithData(hdfsURI, "file", i, "This is test line.");
+
+					synchronized (filesCreated) {
+						filesCreated.add(file.f0);
+						if (filesCreated.size() == noOfFilesBeforeNotifying) {
+							filesCreated.notifyAll();
+						}
+					}
+					Thread.sleep(interval);
+				}
+			} catch (IOException | InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+
+		Set<org.apache.hadoop.fs.Path> getFilesCreated() {
+			return this.filesCreated;
+		}
+	}
+
+	private class TestingSourceContext implements SourceFunction.SourceContext<FileInputSplit> {
+
+		private final ContinuousFileMonitoringFunction src;
+		private final Set<String> filesFound;
+		private final Object lock = new Object();
+
+		TestingSourceContext(ContinuousFileMonitoringFunction monitoringFunction, Set<String> uniqFilesFound) {
+			this.filesFound = uniqFilesFound;
+			this.src = monitoringFunction;
+		}
+
+		@Override
+		public void collect(FileInputSplit element) {
+
+			String filePath = element.getPath().toString();
+			if (filesFound.contains(filePath)) {
+				// check if we have duplicate splits that are open during the first time
+				// the monitor sees them, and they then close, so the modification time changes.
+				Assert.fail("Duplicate file: " + filePath);
+			}
+
+			filesFound.add(filePath);
+			try {
+				if (filesFound.size() == NO_OF_FILES) {
+					this.src.cancel();
+					this.src.close();
+					synchronized (filesFound) {
+						filesFound.notifyAll();
+					}
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(FileInputSplit element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/**
+	 * Fill the file with content.
+	 * */
+	private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+		Assert.assertTrue (!hdfs.exists(file));
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for(int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index f44fe9e..ecf55c3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,7 +20,6 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
@@ -58,23 +58,135 @@ public class CsvInputFormatTest {
 	private static final String SECOND_PART = "That is the second part";
 
 	@Test
-	public void ignoreInvalidLines() {
+	public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
+		testSplitCsvInputStream(1024 * 1024, false);
+	}
+
+	@Test
+	public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
+		testSplitCsvInputStream(2, false);
+	}
+
+	private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
+		final String fileContent =
+			"this is|1|2.0|\n"+
+			"a test|3|4.0|\n" +
+			"#next|5|6.0|\n" +
+			"asdadas|5|30.0|\n";
+
+		// create temporary file with 3 blocks
+		final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp");
+		tempFile.deleteOnExit();
+
+		FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
+		fileOutputStream.write(fileContent.getBytes());
+		fileOutputStream.close();
+
+		// fix the number of blocks and the size of each one.
+		final int noOfBlocks = 3;
+
+		final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+		CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
+		format.setLenient(true);
+		format.setBufferSize(bufferSize);
+
+		final Configuration config = new Configuration();
+		format.configure(config);
+
+		long[] offsetsAfterRecord = new long[]{ 15, 29, 42, 58};
+		long[] offsetAtEndOfSplit = new long[]{ 20, 40, 58};
+		int recordCounter = 0;
+		int splitCounter = 0;
+
+		FileInputSplit[] inputSplits = format.createInputSplits(noOfBlocks);
+		Tuple3<String, Integer, Double> result = new Tuple3<>();
+
+		for (FileInputSplit inputSplit : inputSplits) {
+			assertEquals(inputSplit.getStart() + inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+			splitCounter++;
+
+			format.open(inputSplit);
+			format.reopen(inputSplit, format.getCurrentState());
+
+			while (!format.reachedEnd()) {
+				if ((result = format.nextRecord(result)) != null) {
+					assertEquals((long) format.getCurrentState(), offsetsAfterRecord[recordCounter]);
+					recordCounter++;
+
+					if (recordCounter == 1) {
+						assertNotNull(result);
+						assertEquals("this is", result.f0);
+						assertEquals(new Integer(1), result.f1);
+						assertEquals(new Double(2.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 15);
+					} else if (recordCounter == 2) {
+						assertNotNull(result);
+						assertEquals("a test", result.f0);
+						assertEquals(new Integer(3), result.f1);
+						assertEquals(new Double(4.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 29);
+					} else if (recordCounter == 3) {
+						assertNotNull(result);
+						assertEquals("#next", result.f0);
+						assertEquals(new Integer(5), result.f1);
+						assertEquals(new Double(6.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 42);
+					} else {
+						assertNotNull(result);
+						assertEquals("asdadas", result.f0);
+						assertEquals(new Integer(5), result.f1);
+						assertEquals(new Double(30.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 58);
+					}
+
+					// simulate checkpoint
+					Long state = format.getCurrentState();
+					long offsetToRestore = state;
+
+					// create a new format
+					format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
+					format.setLenient(true);
+					format.setBufferSize(bufferSize);
+					format.configure(config);
+
+					// simulate the restore operation.
+					format.reopen(inputSplit, offsetToRestore);
+				} else {
+					result = new Tuple3<>();
+				}
+			}
+			format.close();
+		}
+		Assert.assertEquals(4, recordCounter);
+	}
+
+	@Test
+	public void ignoreInvalidLinesAndGetOffsetInLargeBuffer() {
+		ignoreInvalidLines(1024 * 1024);
+	}
+
+	@Test
+	public void ignoreInvalidLinesAndGetOffsetInSmallBuffer() {
+		ignoreInvalidLines(2);
+	}
+
+	private void ignoreInvalidLines(int bufferSize) {
 		try {
-			
-			
 			final String fileContent =  "#description of the data\n" + 
 										"header1|header2|header3|\n"+
 										"this is|1|2.0|\n"+
 										"//a comment\n" +
 										"a test|3|4.0|\n" +
-										"#next|5|6.0|\n";
-			
+										"#next|5|6.0|\n" +
+										"asdasdas";
+
 			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
 			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setLenient(true);
-		
+			format.setBufferSize(bufferSize);
+
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
@@ -86,21 +198,25 @@ public class CsvInputFormatTest {
 			assertEquals("this is", result.f0);
 			assertEquals(new Integer(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
-			
+			assertEquals((long) format.getCurrentState(), 65);
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
 			assertEquals(new Integer(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
-			
+			assertEquals((long) format.getCurrentState(), 91);
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("#next", result.f0);
 			assertEquals(new Integer(5), result.f1);
 			assertEquals(new Double(6.0), result.f2);
+			assertEquals((long) format.getCurrentState(), 104);
 
 			result = format.nextRecord(result);
 			assertNull(result);
+			assertEquals(fileContent.length(), (long) format.getCurrentState());
 		}
 		catch (Exception ex) {
 			ex.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 80c5fbc..ee4d31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -101,8 +101,8 @@ public class RuntimeEnvironment implements Environment {
 			InputGate[] inputGates,
 			ActorGateway jobManager,
 			TaskManagerRuntimeInfo taskManagerInfo,
-			Task containingTask,
-			TaskMetricGroup metrics) {
+			TaskMetricGroup metrics,
+			Task containingTask) {
 
 		this.jobId = checkNotNull(jobId);
 		this.jobVertexId = checkNotNull(jobVertexId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c1cbaa6..548d7d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager, taskManagerConfig, this, metrics);
+					writers, inputGates, jobManager, taskManagerConfig, metrics, this);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 21ad762..e824758 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -44,6 +44,11 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 		}
 	}
 
+	public DataStreamSource(SingleOutputStreamOperator<T> operator) {
+		super(operator.environment, operator.getTransformation());
+		this.isParallel = true;
+	}
+
 	@Override
 	public DataStreamSource<T> setParallelism(int parallelism) {
 		if (parallelism > 1 && !isParallel) {


[4/4] flink git commit: [FLINK-3896] Allow a StreamTask to be Externally Cancelled

Posted by al...@apache.org.
[FLINK-3896] Allow a StreamTask to be Externally Cancelled

It adds a method failExternally() to the StreamTask, so that custom Operators
can make their containing task fail when needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc19486c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc19486c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc19486c

Branch: refs/heads/master
Commit: bc19486ccfc4164d6abd9c712db8e92a350c5a85
Parents: fdf4360
Author: kl0u <kk...@gmail.com>
Authored: Tue May 10 18:56:58 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/execution/Environment.java  | 11 +++++++++++
 .../runtime/taskmanager/RuntimeEnvironment.java      |  9 +++++++++
 .../org/apache/flink/runtime/taskmanager/Task.java   |  4 ++--
 .../operators/testutils/DummyEnvironment.java        |  5 +++++
 .../runtime/operators/testutils/MockEnvironment.java |  5 +++++
 .../flink/streaming/runtime/tasks/StreamTask.java    | 15 ++++++++++++++-
 .../runtime/tasks/StreamMockEnvironment.java         |  5 +++++
 7 files changed, 51 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 121936c..9f779ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -166,6 +166,17 @@ public interface Environment {
 	 */
 	void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
 
+	/**
+	 * Marks task execution failed for an external reason (a reason other than the task code itself
+	 * throwing an exception). If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 *
+	 * <p>This method never blocks.</p>
+	 */
+	void failExternally(Throwable cause);
+
 	// --------------------------------------------------------------------------------------------
 	//  Fields relevant to the I/O system. Should go into Task
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 1f93a0d..80c5fbc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -78,6 +78,8 @@ public class RuntimeEnvironment implements Environment {
 	private final TaskManagerRuntimeInfo taskManagerInfo;
 	private final TaskMetricGroup metrics;
 
+	private final Task containingTask;
+
 	// ------------------------------------------------------------------------
 
 	public RuntimeEnvironment(
@@ -99,6 +101,7 @@ public class RuntimeEnvironment implements Environment {
 			InputGate[] inputGates,
 			ActorGateway jobManager,
 			TaskManagerRuntimeInfo taskManagerInfo,
+			Task containingTask,
 			TaskMetricGroup metrics) {
 
 		this.jobId = checkNotNull(jobId);
@@ -119,6 +122,7 @@ public class RuntimeEnvironment implements Environment {
 		this.inputGates = checkNotNull(inputGates);
 		this.jobManager = checkNotNull(jobManager);
 		this.taskManagerInfo = checkNotNull(taskManagerInfo);
+		this.containingTask = containingTask;
 		this.metrics = metrics;
 	}
 
@@ -262,4 +266,9 @@ public class RuntimeEnvironment implements Environment {
 
 		jobManager.tell(message);
 	}
+
+	@Override
+	public void failExternally(Throwable cause) {
+		this.containingTask.failExternally(cause);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1f766e1..c1cbaa6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager, taskManagerConfig, metrics);
+					writers, inputGates, jobManager, taskManagerConfig, this, metrics);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);
@@ -703,7 +703,7 @@ public class Task implements Runnable {
 				LOG.error(message, t);
 				notifyFatalError(message, t);
 			}
-			
+
 			// un-register the metrics at the end so that the task may already be
 			// counted as finished when this happens
 			// errors here will only be logged

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 78fb422..063e295 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
 
 	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("DummyEnvironment does not support external task failure.");
+	}
+
+	@Override
 	public ResultPartitionWriter getWriter(int index) {
 		return null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 0220149..78e4cce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -289,4 +289,9 @@ public class MockEnvironment implements Environment {
 	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
 		throw new UnsupportedOperationException();
 	}
+
+	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 51904b3..a771c85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -295,7 +295,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			}
 		}
 	}
-	
+
+	/**
+	 * Marks task execution failed for an external reason (a reason other than the task code itself
+	 * throwing an exception). If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 *
+	 * <p>This method never blocks.</p>
+	 */
+	public void failExternally(Throwable cause) {
+		getEnvironment().failExternally(cause);
+	}
+
 	@Override
 	public final void cancel() throws Exception {
 		isRunning = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc19486c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index f8c36de..a8dd49b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,11 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
+	public void failExternally(Throwable cause) {
+		throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
+	}
+
+	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
 		return new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir"));
 	}


[2/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ae4758f..1cd052c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -51,14 +51,18 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
 import org.apache.flink.streaming.api.functions.source.FileReadFunction;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -875,24 +879,34 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The file will be
-	 * read with the system's default character set.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
+	 * line. The file will be read with the system's default character set.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
 	 * @return The data stream that represents the data read from the given file as text lines
 	 */
 	public DataStreamSource<String> readTextFile(String filePath) {
-		Preconditions.checkNotNull(filePath, "The file path may not be null.");
-		TextInputFormat format = new TextInputFormat(new Path(filePath));
-		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
-
-		return createInput(format, typeInfo, "Read Text File Source");
+		return readTextFile(filePath, "UTF-8");
 	}
 
 	/**
-	 * Creates a data stream that represents the Strings produced by reading the given file line wise. The {@link
-	 * java.nio.charset.Charset} with the given name will be used to read the files.
+	 * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such
+	 * line. The {@link java.nio.charset.Charset} with the given name will be used to read the files.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
@@ -902,15 +916,32 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
 		Preconditions.checkNotNull(filePath, "The file path may not be null.");
+
 		TextInputFormat format = new TextInputFormat(new Path(filePath));
 		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
 		format.setCharsetName(charsetName);
 
-		return createInput(format, typeInfo, "Read Text File Source");
+		return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1,
+			FilePathFilter.createDefaultFilter(), typeInfo);
 	}
 
 	/**
-	 * Reads the given file with the given imput format.
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+	 *
+	 * <p>
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> The source monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed,
+	 * forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints after that point.
 	 *
 	 * @param filePath
 	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
@@ -920,19 +951,64 @@ public abstract class StreamExecutionEnvironment {
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data read from the given file
 	 */
-	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath) {
-		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
-		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath) {
+		return readFile(inputFormat, filePath, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter());
+	}
 
-		inputFormat.setFilePath(new Path(filePath));
+	/**
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}. Depending
+	 * on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms) the path
+	 * for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and
+	 * exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed, the user
+	 * can specify a custom {@link FilePathFilter}. As a default implementation you can use
+	 * {@link FilePathFilter#createDefaultFilter()}.
+	 *
+	 * <p>
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param watchType
+	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+	 * @param interval
+	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+	 * @param filter
+	 * 		The files to be excluded from the processing
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
+	 */
+	@PublicEvolving
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath,
+												FileProcessingMode watchType,
+												long interval,
+												FilePathFilter filter) {
+
+		TypeInformation<OUT> typeInformation;
 		try {
-			return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Read File source");
+			typeInformation = TypeExtractor.getInputFormatTypes(inputFormat);
 		} catch (Exception e) {
-			throw new InvalidProgramException("The type returned by the input format could not be automatically " +
-					"determined. " +
-					"Please specify the TypeInformation of the produced type explicitly by using the " +
-					"'createInput(InputFormat, TypeInformation)' method instead.");
+			throw new InvalidProgramException("The type returned by the input format could not be " +
+				"automatically determined. Please specify the TypeInformation of the produced type " +
+				"explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
 		}
+		return readFile(inputFormat, filePath, watchType, interval, filter, typeInformation);
 	}
 
 	/**
@@ -952,15 +1028,62 @@ public abstract class StreamExecutionEnvironment {
 	 * 		of files.
 	 * @return The DataStream containing the given directory.
 	 */
+	@Deprecated
 	public DataStream<String> readFileStream(String filePath, long intervalMillis,
-											WatchType watchType) {
+											FileMonitoringFunction.WatchType watchType) {
 		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
-				filePath, intervalMillis, watchType), "Read File Stream source");
+			filePath, intervalMillis, watchType), "Read File Stream source");
 
 		return source.flatMap(new FileReadFunction());
 	}
 
 	/**
+	 * Reads the contents of the user-specified {@code filePath} based on the given {@link FileInputFormat}.
+	 * Depending on the provided {@link FileProcessingMode}, the source may periodically monitor (every {@code interval} ms)
+	 * the path for new data ({@link FileProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the
+	 * path and exit ({@link FileProcessingMode#PROCESS_ONCE}). In addition, if the path contains files not to be processed,
+	 * the user can specify a custom {@link FilePathFilter}. As a default implementation you can use
+	 * {@link FilePathFilter#createDefaultFilter()}.
+	 *
+	 * <p>
+	 *  <b> NOTES ON CHECKPOINTING: </b> If the {@code watchType} is set to {@link FileProcessingMode#PROCESS_ONCE},
+	 * the source monitors the path <b>once</b>, creates the {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits}
+	 * to be processed, forwards them to the downstream {@link ContinuousFileReaderOperator readers} to read the actual data,
+	 * and exits, without waiting for the readers to finish reading. This implies that no more checkpoint barriers
+	 * are going to be forwarded after the source exits, thus having no checkpoints after that point.
+	 *
+	 * @param inputFormat
+	 * 		The input format used to create the data stream
+	 * @param filePath
+	 * 		The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path")
+	 * @param watchType
+	 * 		The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit
+	 * @param filter
+	 * 		The files to be excluded from the processing
+	 * @param typeInformation
+	 * 		Information on the type of the elements in the output stream
+	 * @param interval
+	 * 		In the case of periodic path monitoring, this specifies the interval (in millis) between consecutive path scans
+	 * @param <OUT>
+	 * 		The type of the returned data stream
+	 * @return The data stream that represents the data read from the given file
+	 */
+	@PublicEvolving
+	public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat,
+												String filePath,
+												FileProcessingMode watchType,
+												long interval,
+												FilePathFilter filter,
+												TypeInformation<OUT> typeInformation) {
+
+		Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
+		Preconditions.checkNotNull(filePath, "The file path must not be null.");
+
+		inputFormat.setFilePath(filePath);
+		return createFileInput(inputFormat, typeInformation, "Custom File Source", watchType, filter, interval);
+	}
+
+	/**
 	 * Creates a new data stream that contains the strings received infinitely from a socket. Received strings are
 	 * decoded by the system's default character set. On the termination of the socket server connection retries can be
 	 * initiated.
@@ -1026,12 +1149,20 @@ public abstract class StreamExecutionEnvironment {
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
 	 * <p>
-	 * Since all data streams need specific information about their types, this method needs to determine the type of
-	 * the data produced by the input format. It will attempt to determine the data type by reflection, unless the
-	 * input
-	 * format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface. In the latter
-	 * case, this method will invoke the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()}
-	 * method to determine data type produced by the input format.
+	 * Since all data streams need specific information about their types, this method needs to determine the
+	 * type of the data produced by the input format. It will attempt to determine the data type by reflection,
+	 * unless the input format implements the {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
+	 * In the latter case, this method will invoke the
+	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable#getProducedType()} method to determine data
+	 * type produced by the input format.
+	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+	 * without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints.
 	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
@@ -1041,35 +1172,84 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@PublicEvolving
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
-		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat), "Custom File source");
+		return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
 	}
 
 	/**
 	 * Generic method to create an input data stream with {@link org.apache.flink.api.common.io.InputFormat}.
 	 * <p>
-	 * The data stream is typed to the given TypeInformation. This method is intended for input formats where the
-	 * return
-	 * type cannot be determined by reflection analysis, and that do not implement the
+	 * The data stream is typed to the given TypeInformation. This method is intended for input formats
+	 * where the return type cannot be determined by reflection analysis, and that do not implement the
 	 * {@link org.apache.flink.api.java.typeutils.ResultTypeQueryable} interface.
 	 *
+	 * <p>
+	 * <b> NOTES ON CHECKPOINTING: </b> In the case of a {@link FileInputFormat}, the source
+	 * (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path, creates the
+	 * {@link org.apache.flink.core.fs.FileInputSplit FileInputSplits} to be processed, forwards
+	 * them to the downstream {@link ContinuousFileReaderOperator} to read the actual data, and exits,
+	 * without waiting for the readers to finish reading. This implies that no more checkpoint
+	 * barriers are going to be forwarded after the source exits, thus having no checkpoints.
+	 *
 	 * @param inputFormat
 	 * 		The input format used to create the data stream
+	 * @param typeInfo
+	 * 		The information about the type of the output type
 	 * @param <OUT>
 	 * 		The type of the returned data stream
 	 * @return The data stream that represents the data created by the input format
 	 */
 	@PublicEvolving
 	public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo) {
-		return createInput(inputFormat, typeInfo, "Custom File source");
+		DataStreamSource<OUT> source;
+
+		if (inputFormat instanceof FileInputFormat) {
+			FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat;
+			source = createFileInput(format, typeInfo, "Custom File source",
+				FileProcessingMode.PROCESS_ONCE,
+				FilePathFilter.createDefaultFilter(),  -1);
+		} else {
+			source = createInput(inputFormat, typeInfo, "Custom Source");
+		}
+		return source;
 	}
 
-	// private helper for passing different names
 	private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat,
-			TypeInformation<OUT> typeInfo, String sourceName) {
-		FileSourceFunction<OUT> function = new FileSourceFunction<>(inputFormat, typeInfo);
+													TypeInformation<OUT> typeInfo,
+													String sourceName) {
+
+		InputFormatSource<OUT> function = new InputFormatSource<>(inputFormat, typeInfo);
 		return addSource(function, sourceName, typeInfo);
 	}
 
+	private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,
+														TypeInformation<OUT> typeInfo,
+														String sourceName,
+														FileProcessingMode watchType,
+														FilePathFilter pathFilter,
+														long interval) {
+
+		Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
+		Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
+		Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
+		Preconditions.checkNotNull(watchType, "Unspecified watchtype.");
+		Preconditions.checkNotNull(pathFilter, "Unspecified path name filtering function.");
+
+		Preconditions.checkArgument(watchType.equals(FileProcessingMode.PROCESS_ONCE) ||
+			interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+			"The path monitoring interval cannot be less than 100 ms.");
+
+		ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>(
+			inputFormat, inputFormat.getFilePath().toString(),
+			pathFilter, watchType, getParallelism(), interval);
+
+		ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat);
+
+		SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
+			.transform("FileSplitReader_" + sourceName, typeInfo, reader);
+
+		return new DataStreamSource<>(source);
+	}
+
 	/**
 	 * Adds a Data Source to the streaming topology.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
new file mode 100644
index 0000000..b97c274
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which splits will be further processed
+ * depends on the user-provided {@link FileProcessingMode} and the {@link FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction<OUT>
+	extends RichSourceFunction<FileInputSplit> implements Checkpointed<Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long>> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+	/**
+	 * The minimum interval allowed between consecutive path scans. This is applicable if the
+	 * {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+	 */
+	public static final long MIN_MONITORING_INTERVAL = 100l;
+
+	/** The path to monitor. */
+	private final String path;
+
+	/** The default parallelism for the job, as this is going to be the parallelism of the downstream readers. */
+	private final int readerParallelism;
+
+	/** The {@link FileInputFormat} to be read. */
+	private FileInputFormat<OUT> format;
+
+	/** How often to monitor the state of the directory for new data. */
+	private final long interval;
+
+	/** Which new data to process (see {@link FileProcessingMode}. */
+	private final FileProcessingMode watchType;
+
+	private List<Tuple2<Long, List<FileInputSplit>>> splitsToFwdOrderedAscByModTime;
+
+	private Tuple2<Long, List<FileInputSplit>> currentSplitsToFwd;
+
+	private long globalModificationTime;
+
+	private FilePathFilter pathFilter;
+
+	private volatile boolean isRunning = true;
+
+	public ContinuousFileMonitoringFunction(
+		FileInputFormat<OUT> format, String path,
+		FilePathFilter filter, FileProcessingMode watchType,
+		int readerParallelism, long interval) {
+
+		if (watchType != FileProcessingMode.PROCESS_ONCE && interval < MIN_MONITORING_INTERVAL) {
+			throw new IllegalArgumentException("The specified monitoring interval (" + interval + " ms) is " +
+				"smaller than the minimum allowed one (100 ms).");
+		}
+		this.format = Preconditions.checkNotNull(format, "Unspecified File Input Format.");
+		this.path = Preconditions.checkNotNull(path, "Unspecified Path.");
+		this.pathFilter = Preconditions.checkNotNull(filter, "Unspecified File Path Filter.");
+
+		this.interval = interval;
+		this.watchType = watchType;
+		this.readerParallelism = Math.max(readerParallelism, 1);
+		this.globalModificationTime = Long.MIN_VALUE;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open(Configuration parameters) throws Exception {
+		LOG.info("Opening File Monitoring Source.");
+		
+		super.open(parameters);
+		format.configure(parameters);
+	}
+
+	@Override
+	public void run(SourceFunction.SourceContext<FileInputSplit> context) throws Exception {
+		FileSystem fileSystem = FileSystem.get(new URI(path));
+
+		switch (watchType) {
+			case PROCESS_CONTINUOUSLY:
+				while (isRunning) {
+					monitorDirAndForwardSplits(fileSystem, context);
+					Thread.sleep(interval);
+				}
+				isRunning = false;
+				break;
+			case PROCESS_ONCE:
+				monitorDirAndForwardSplits(fileSystem, context);
+				isRunning = false;
+				break;
+			default:
+				isRunning = false;
+				throw new RuntimeException("Unknown WatchType" + watchType);
+		}
+	}
+
+	private void monitorDirAndForwardSplits(FileSystem fs, SourceContext<FileInputSplit> context) throws IOException, JobException {
+		final Object lock = context.getCheckpointLock();
+
+		// it may be non-null in the case of a recovery after a failure.
+		if (currentSplitsToFwd != null) {
+			synchronized (lock) {
+				forwardSplits(currentSplitsToFwd, context);
+			}
+		}
+		currentSplitsToFwd = null;
+
+		// it may be non-null in the case of a recovery after a failure.
+		if (splitsToFwdOrderedAscByModTime == null) {
+			splitsToFwdOrderedAscByModTime = getInputSplitSortedOnModTime(fs);
+		}
+
+		Iterator<Tuple2<Long, List<FileInputSplit>>> it =
+			splitsToFwdOrderedAscByModTime.iterator();
+
+		while (it.hasNext()) {
+			synchronized (lock) {
+				currentSplitsToFwd = it.next();
+				it.remove();
+				forwardSplits(currentSplitsToFwd, context);
+			}
+		}
+
+		// set them to null to distinguish from a restore.
+		splitsToFwdOrderedAscByModTime = null;
+		currentSplitsToFwd = null;
+	}
+
+	private void forwardSplits(Tuple2<Long, List<FileInputSplit>> splitsToFwd, SourceContext<FileInputSplit> context) {
+		currentSplitsToFwd = splitsToFwd;
+		Long modTime = currentSplitsToFwd.f0;
+		List<FileInputSplit> splits = currentSplitsToFwd.f1;
+
+		Iterator<FileInputSplit> it = splits.iterator();
+		while (it.hasNext()) {
+			FileInputSplit split = it.next();
+			processSplit(split, context);
+			it.remove();
+		}
+
+		// update the global modification time
+		if (modTime >= globalModificationTime) {
+			globalModificationTime = modTime;
+		}
+	}
+
+	private void processSplit(FileInputSplit split, SourceContext<FileInputSplit> context) {
+		LOG.info("Forwarding split: " + split);
+		context.collect(split);
+	}
+
+	private List<Tuple2<Long, List<FileInputSplit>>> getInputSplitSortedOnModTime(FileSystem fileSystem) throws IOException {
+		List<FileStatus> eligibleFiles = listEligibleFiles(fileSystem);
+		if (eligibleFiles.isEmpty()) {
+			return new ArrayList<>();
+		}
+
+		Map<Long, List<FileInputSplit>> splitsToForward = getInputSplits(eligibleFiles);
+		List<Tuple2<Long, List<FileInputSplit>>> sortedSplitsToForward = new ArrayList<>();
+
+		for (Map.Entry<Long, List<FileInputSplit>> entry : splitsToForward.entrySet()) {
+			sortedSplitsToForward.add(new Tuple2<>(entry.getKey(), entry.getValue()));
+		}
+
+		Collections.sort(sortedSplitsToForward, new Comparator<Tuple2<Long, List<FileInputSplit>>>() {
+			@Override
+			public int compare(Tuple2<Long, List<FileInputSplit>> o1, Tuple2<Long, List<FileInputSplit>> o2) {
+				return (int) (o1.f0 - o2.f0);
+			}
+		});
+
+		return sortedSplitsToForward;
+	}
+
+	/**
+	 * Creates the input splits for the path to be forwarded to the downstream tasks of the
+	 * {@link ContinuousFileReaderOperator}. Those tasks are going to read their contents for further
+	 * processing. Splits belonging to files in the {@code eligibleFiles} list are the ones
+	 * that are shipped for further processing.
+	 * @param eligibleFiles The files to process.
+	 */
+	private Map<Long, List<FileInputSplit>> getInputSplits(List<FileStatus> eligibleFiles) throws IOException {
+		if (eligibleFiles.isEmpty()) {
+			return new HashMap<>();
+		}
+
+		FileInputSplit[] inputSplits = format.createInputSplits(readerParallelism);
+
+		Map<Long, List<FileInputSplit>> splitsPerFile = new HashMap<>();
+		for (FileInputSplit split: inputSplits) {
+			for (FileStatus file: eligibleFiles) {
+				if (file.getPath().equals(split.getPath())) {
+					Long modTime = file.getModificationTime();
+
+					List<FileInputSplit> splitsToForward = splitsPerFile.get(modTime);
+					if (splitsToForward == null) {
+						splitsToForward = new LinkedList<>();
+						splitsPerFile.put(modTime, splitsToForward);
+					}
+					splitsToForward.add(split);
+					break;
+				}
+			}
+		}
+		return splitsPerFile;
+	}
+
+	/**
+	 * Returns the files that have data to be processed. This method returns the
+	 * Paths to the aforementioned files. It is up to the {@link #processSplit(FileInputSplit, SourceContext)}
+	 * method to decide which parts of the file to be processed, and forward them downstream.
+	 */
+	private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
+		List<FileStatus> files = new ArrayList<>();
+
+		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+		if (statuses == null) {
+			LOG.warn("Path does not exist: {}", path);
+		} else {
+			// handle the new files
+			for (FileStatus status : statuses) {
+				Path filePath = status.getPath();
+				long modificationTime = status.getModificationTime();
+				if (!shouldIgnore(filePath, modificationTime)) {
+					files.add(status);
+				}
+			}
+		}
+		return files;
+	}
+
+	/**
+	 * Returns {@code true} if the file is NOT to be processed further.
+	 * This happens in the following cases:
+	 *
+	 * If the user-specified path filtering method returns {@code true} for the file,
+	 * or if the modification time of the file is smaller than the {@link #globalModificationTime}, which
+	 * is the time of the most recent modification found in any of the already processed files.
+	 */
+	private boolean shouldIgnore(Path filePath, long modificationTime) {
+		boolean shouldIgnore = ((pathFilter != null && pathFilter.filterPath(filePath)) || modificationTime <= globalModificationTime);
+		if (shouldIgnore) {
+			LOG.debug("Ignoring " + filePath + ", with mod time= " + modificationTime + " and global mod time= " + globalModificationTime);
+		}
+		return  shouldIgnore;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		isRunning = false;
+		LOG.info("Closed File Monitoring Source.");
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+	//	---------------------			Checkpointing			--------------------------
+
+	@Override
+	public Tuple3<List<Tuple2<Long, List<FileInputSplit>>>, Tuple2<Long, List<FileInputSplit>>, Long> snapshotState(
+		long checkpointId, long checkpointTimestamp) throws Exception {
+
+		if (!isRunning) {
+			LOG.debug("snapshotState() called on closed source");
+			return null;
+		}
+		return new Tuple3<>(splitsToFwdOrderedAscByModTime,
+			currentSplitsToFwd, globalModificationTime);
+	}
+
+	@Override
+	public void restoreState(Tuple3<List<Tuple2<Long, List<FileInputSplit>>>,
+		Tuple2<Long, List<FileInputSplit>>, Long> state) throws Exception {
+
+		this.splitsToFwdOrderedAscByModTime = state.f0;
+		this.currentSplitsToFwd = state.f1;
+		this.globalModificationTime = state.f2;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
new file mode 100644
index 0000000..4d4a792
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
+ * the preceding {@link ContinuousFileMonitoringFunction}. This operator will receive just the split descriptors
+ * and then read and emit records. This may lead to increased backpressure. To avoid this, we have another
+ * thread ({@link SplitReader}) actually reading the splits and emitting the elements, which is separate from
+ * the thread forwarding the checkpoint barriers. The two threads sync on the {@link StreamTask#getCheckpointLock()}
+ * so that the checkpoints reflect the current state.
+ */
+@Internal
+public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
+
+	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
+
+	private transient SplitReader<S, OUT> reader;
+	private transient TimestampedCollector<OUT> collector;
+
+	private FileInputFormat<OUT> format;
+	private TypeSerializer<OUT> serializer;
+
+	private Object checkpointLock;
+
+	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+
+	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
+		this.format = checkNotNull(format);
+	}
+
+	@Override
+	public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
+		this.serializer = outTypeInfo.createSerializer(executionConfig);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+
+		if (this.serializer == null) {
+			throw new IllegalStateException("The serializer has not been set. " +
+				"Probably the setOutputType() was not called and this should not have happened. " +
+				"Please report it.");
+		}
+
+		this.format.configure(new Configuration());
+		this.collector = new TimestampedCollector<>(output);
+		this.checkpointLock = getContainingTask().getCheckpointLock();
+
+		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
+		this.reader.start();
+	}
+
+	@Override
+	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
+		reader.addSplit(element.getValue());
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public void dispose() {
+		super.dispose();
+
+		// first try to cancel it properly and
+		// give it some time until it finishes
+		reader.cancel();
+		try {
+			reader.join(200);
+		} catch (InterruptedException e) {
+			// we can ignore this
+		}
+
+		// if the above did not work, then interrupt the thread repeatedly
+		while (reader.isAlive()) {
+
+			StringBuilder bld = new StringBuilder();
+			StackTraceElement[] stack = reader.getStackTrace();
+			for (StackTraceElement e : stack) {
+				bld.append(e).append('\n');
+			}
+			LOG.warn("The reader is stuck in method:\n {}", bld.toString());
+
+			reader.interrupt();
+			try {
+				reader.join(50);
+			} catch (InterruptedException e) {
+				// we can ignore this
+			}
+		}
+		reader = null;
+		collector = null;
+		format = null;
+		serializer = null;
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+
+		// signal that no more splits will come, wait for the reader to finish
+		// and close the collector. Further cleaning up is handled by the dispose().
+
+		if (reader != null && reader.isAlive() && reader.isRunning()) {
+			// add a dummy element to signal that no more splits will
+			// arrive and wait until the reader finishes
+			reader.addSplit(EOS);
+
+			// we already have the checkpoint lock because close() is
+			// called by the StreamTask while having it.
+			checkpointLock.wait();
+		}
+		collector.close();
+	}
+
+	private class SplitReader<S extends Serializable, OT> extends Thread {
+
+		private volatile boolean isRunning;
+
+		private final FileInputFormat<OT> format;
+		private final TypeSerializer<OT> serializer;
+
+		private final Object checkpointLock;
+		private final TimestampedCollector<OT> collector;
+
+		private final Queue<FileInputSplit> pendingSplits;
+
+		private FileInputSplit currentSplit = null;
+
+		private S restoredFormatState = null;
+
+		SplitReader(FileInputFormat<OT> format,
+					TypeSerializer<OT> serializer,
+					TimestampedCollector<OT> collector,
+					Object checkpointLock,
+					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
+
+			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
+			this.serializer = checkNotNull(serializer, "Unspecified Serialized.");
+
+			this.pendingSplits = new LinkedList<>();
+			this.collector = collector;
+			this.checkpointLock = checkpointLock;
+			this.isRunning = true;
+
+			// this is the case where a task recovers from a previous failed attempt
+			if (restoredState != null) {
+				List<FileInputSplit> pending = restoredState.f0;
+				FileInputSplit current = restoredState.f1;
+				S formatState = restoredState.f2;
+
+				for (FileInputSplit split : pending) {
+					pendingSplits.add(split);
+				}
+
+				this.currentSplit = current;
+				this.restoredFormatState = formatState;
+			}
+			ContinuousFileReaderOperator.this.readerState = null;
+		}
+
+		void addSplit(FileInputSplit split) {
+			Preconditions.checkNotNull(split);
+			synchronized (checkpointLock) {
+				this.pendingSplits.add(split);
+			}
+		}
+
+		public boolean isRunning() {
+			return this.isRunning;
+		}
+
+		@Override
+		public void run() {
+			try {
+				while (this.isRunning) {
+
+					synchronized (checkpointLock) {
+						if (this.currentSplit != null) {
+
+							if (currentSplit.equals(EOS)) {
+								isRunning = false;
+								break;
+							}
+
+							if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
+								((CheckpointableInputFormat) format).reopen(currentSplit, restoredFormatState);
+							} else {
+								// this is the case of a non-checkpointable input format that will reprocess the last split.
+								LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
+								format.open(currentSplit);
+							}
+							// reset the restored state to null for the next iteration
+							this.restoredFormatState = null;
+						} else {
+
+							// get the next split to read.
+							currentSplit = this.pendingSplits.poll();
+
+							if (currentSplit == null) {
+								checkpointLock.wait(50);
+								continue;
+							}
+
+							if (currentSplit.equals(EOS)) {
+								isRunning = false;
+								break;
+							}
+							this.format.open(currentSplit);
+						}
+					}
+
+					LOG.info("Reading split: " + currentSplit);
+
+					try {
+						OT nextElement = serializer.createInstance();
+						while (!format.reachedEnd()) {
+							synchronized (checkpointLock) {
+								nextElement = format.nextRecord(nextElement);
+								if (nextElement != null) {
+									collector.collect(nextElement);
+								} else {
+									break;
+								}
+							}
+						}
+
+					} finally {
+						// close and prepare for the next iteration
+						this.format.close();
+						this.currentSplit = null;
+					}
+				}
+
+			} catch (Throwable e) {
+				if (isRunning) {
+					LOG.error("Caught exception processing split: ", currentSplit);
+				}
+				getContainingTask().failExternally(e);
+			} finally {
+				synchronized (checkpointLock) {
+					LOG.info("Reader terminated, and exiting...");
+					checkpointLock.notifyAll();
+				}
+			}
+		}
+
+		Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
+			List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
+			for (FileInputSplit split: this.pendingSplits) {
+				snapshot.add(split);
+			}
+
+			// remove the current split from the list if inside.
+			if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) {
+				this.pendingSplits.remove();
+			}
+
+			if (this.format instanceof CheckpointableInputFormat) {
+				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
+				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
+			} else {
+				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
+				return new Tuple3<>(snapshot, currentSplit, null);
+			}
+		}
+
+		public void cancel() {
+			this.isRunning = false;
+		}
+	}
+
+	//	---------------------			Checkpointing			--------------------------
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		final AbstractStateBackend.CheckpointStateOutputStream os =
+			this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
+
+		final ObjectOutputStream oos = new ObjectOutputStream(os);
+		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+		Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
+		List<FileInputSplit> pendingSplits = readerState.f0;
+		FileInputSplit currSplit = readerState.f1;
+		S formatState = readerState.f2;
+
+		// write the current split
+		oos.writeObject(currSplit);
+
+		// write the pending ones
+		ov.writeInt(pendingSplits.size());
+		for (FileInputSplit split : pendingSplits) {
+			oos.writeObject(split);
+		}
+
+		// write the state of the reading channel
+		oos.writeObject(formatState);
+		taskState.setOperatorState(os.closeAndGetHandle());
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		StreamStateHandle stream = (StreamStateHandle) state.getOperatorState();
+
+		final InputStream is = stream.getState(getUserCodeClassloader());
+		final ObjectInputStream ois = new ObjectInputStream(is);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
+
+		// read the split that was being read
+		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+
+		// read the pending splits list
+		List<FileInputSplit> pendingSplits = new LinkedList<>();
+		int noOfSplits = div.readInt();
+		for (int i = 0; i < noOfSplits; i++) {
+			FileInputSplit split = (FileInputSplit) ois.readObject();
+			pendingSplits.add(split);
+		}
+
+		// read the state of the format
+		S formatState = (S) ois.readObject();
+
+		// set the whole reader state for the open() to find.
+		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
+		div.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
index fc24079..06da8c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunction.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -32,7 +31,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-@PublicEvolving
+@Deprecated
 public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Long, Long>> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
new file mode 100644
index 0000000..1a359ab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FilePathFilter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+
+/**
+ * An interface to be implemented by the user when using the {@link ContinuousFileMonitoringFunction}.
+ * The {@link #filterPath(Path)} method is responsible for deciding if a path is eligible for further
+ * processing or not. This can serve to exclude temporary or partial files that
+ * are still being written.
+ */
+@PublicEvolving
+public abstract class FilePathFilter implements Serializable {
+
+	public static FilePathFilter createDefaultFilter() {
+		return new DefaultFilter();
+	}
+	/**
+	 * Returns {@code true} if the {@code filePath} given is to be
+	 * ignored when processing a directory, e.g.
+	 * <pre>
+	 * {@code
+	 *
+	 * public boolean filterPaths(Path filePath) {
+	 *     return filePath.getName().startsWith(".") || filePath.getName().contains("_COPYING_");
+	 * }
+	 * }</pre>
+	 */
+	public abstract boolean filterPath(Path filePath);
+
+	/**
+	 * The default file path filtering method and is used
+	 * if no other such function is provided. This filter leaves out
+	 * files starting with ".", "_", and "_COPYING_".
+	 */
+	public static class DefaultFilter extends FilePathFilter {
+
+		DefaultFilter() {}
+
+		@Override
+		public boolean filterPath(Path filePath) {
+			return filePath == null ||
+				filePath.getName().startsWith(".") ||
+				filePath.getName().startsWith("_") ||
+				filePath.getName().contains("_COPYING_");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
new file mode 100644
index 0000000..cdbeb2b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Specifies when the computation of the {@link ContinuousFileMonitoringFunction}
+ * will be triggered.
+ */
+@PublicEvolving
+public enum FileProcessingMode {
+
+	PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
+	PROCESS_CONTINUOUSLY		// Reprocesses the whole file when new data is appended.
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index 0f78826..ac1e834 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -21,7 +21,6 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.URI;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -29,7 +28,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
-@PublicEvolving
+@Deprecated
 public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
deleted file mode 100644
index 0dcb9ff..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.source;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-@PublicEvolving
-public class FileSourceFunction<OUT> extends RichParallelSourceFunction<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private TypeInformation<OUT> typeInfo;
-	private transient TypeSerializer<OUT> serializer;
-
-	private InputFormat<OUT, InputSplit> format;
-
-	private transient InputSplitProvider provider;
-	private transient Iterator<InputSplit> splitIterator;
-
-	private volatile boolean isRunning = true;
-
-	@SuppressWarnings("unchecked")
-	public FileSourceFunction(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
-		this.format = (InputFormat<OUT, InputSplit>) format;
-		this.typeInfo = typeInfo;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void open(Configuration parameters) throws Exception {
-		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
-		this.provider = context.getInputSplitProvider();
-		
-		format.configure(parameters);
-		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
-
-		splitIterator = getInputSplits();
-		if (splitIterator.hasNext()) {
-			format.open(splitIterator.next());
-		}
-		isRunning = true;
-	}
-
-	@Override
-	public void close() throws Exception {
-		format.close();
-	}
-
-	private Iterator<InputSplit> getInputSplits() {
-
-		return new Iterator<InputSplit>() {
-
-			private InputSplit nextSplit;
-
-			private boolean exhausted;
-
-			@Override
-			public boolean hasNext() {
-				if (exhausted) {
-					return false;
-				}
-
-				if (nextSplit != null) {
-					return true;
-				}
-
-				InputSplit split = provider.getNextInputSplit();
-
-				if (split != null) {
-					this.nextSplit = split;
-					return true;
-				} else {
-					exhausted = true;
-					return false;
-				}
-			}
-
-			@Override
-			public InputSplit next() {
-				if (this.nextSplit == null && !hasNext()) {
-					throw new NoSuchElementException();
-				}
-
-				final InputSplit tmp = this.nextSplit;
-				this.nextSplit = null;
-				return tmp;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		while (isRunning) {
-			OUT nextElement = serializer.createInstance();
-			nextElement =  format.nextRecord(nextElement);
-			if (nextElement == null && splitIterator.hasNext()) {
-				format.open(splitIterator.next());
-				continue;
-			} else if (nextElement == null) {
-				break;
-			}
-			ctx.collect(nextElement);
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-
-
-	/**
-	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
-	 * split assigner on the {@code StreamGraph}.
-	 */
-	public InputFormat<OUT, InputSplit> getFormat() {
-		return format;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
new file mode 100644
index 0000000..2a84781
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/InputFormatSource.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+@Internal
+public class InputFormatSource<OUT> extends RichParallelSourceFunction<OUT> {
+	private static final long serialVersionUID = 1L;
+
+	private TypeInformation<OUT> typeInfo;
+	private transient TypeSerializer<OUT> serializer;
+
+	private InputFormat<OUT, InputSplit> format;
+
+	private transient InputSplitProvider provider;
+	private transient Iterator<InputSplit> splitIterator;
+
+	private volatile boolean isRunning = true;
+
+	@SuppressWarnings("unchecked")
+	public InputFormatSource(InputFormat<OUT, ?> format, TypeInformation<OUT> typeInfo) {
+		this.format = (InputFormat<OUT, InputSplit>) format;
+		this.typeInfo = typeInfo;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open(Configuration parameters) throws Exception {
+		StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+		this.provider = context.getInputSplitProvider();
+		
+		format.configure(parameters);
+		serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
+
+		splitIterator = getInputSplits();
+		if (splitIterator.hasNext()) {
+			format.open(splitIterator.next());
+		}
+		isRunning = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		format.close();
+	}
+
+	private Iterator<InputSplit> getInputSplits() {
+
+		return new Iterator<InputSplit>() {
+
+			private InputSplit nextSplit;
+
+			private boolean exhausted;
+
+			@Override
+			public boolean hasNext() {
+				if (exhausted) {
+					return false;
+				}
+
+				if (nextSplit != null) {
+					return true;
+				}
+
+				InputSplit split = provider.getNextInputSplit();
+
+				if (split != null) {
+					this.nextSplit = split;
+					return true;
+				} else {
+					exhausted = true;
+					return false;
+				}
+			}
+
+			@Override
+			public InputSplit next() {
+				if (this.nextSplit == null && !hasNext()) {
+					throw new NoSuchElementException();
+				}
+
+				final InputSplit tmp = this.nextSplit;
+				this.nextSplit = null;
+				return tmp;
+			}
+
+			@Override
+			public void remove() {
+				throw new UnsupportedOperationException();
+			}
+		};
+	}
+
+	@Override
+	public void run(SourceContext<OUT> ctx) throws Exception {
+		while (isRunning) {
+			OUT nextElement = serializer.createInstance();
+			nextElement =  format.nextRecord(nextElement);
+			if (nextElement == null && splitIterator.hasNext()) {
+				format.open(splitIterator.next());
+				continue;
+			} else if (nextElement == null) {
+				break;
+			}
+			ctx.collect(nextElement);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+
+
+	/**
+	 * Returns the {@code InputFormat}. This is only needed because we need to set the input
+	 * split assigner on the {@code StreamGraph}.
+	 */
+	public InputFormat<OUT, InputSplit> getFormat() {
+		return format;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 70c5cff..685655e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -21,7 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSource;
 import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -425,8 +425,8 @@ public class StreamGraphGenerator {
 				null,
 				source.getOutputType(),
 				"Source: " + source.getName());
-		if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
-			FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
+		if (source.getOperator().getUserFunction() instanceof InputFormatSource) {
+			InputFormatSource<T> fs = (InputFormatSource<T>) source.getOperator().getUserFunction();
 			streamGraph.setInputFormat(source.getId(), fs.getFormat());
 		}
 		streamGraph.setParallelism(source.getId(), source.getParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
index 86677a6..4517eea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OutputTypeConfigurable.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 public interface OutputTypeConfigurable<OUT> {
 
 	/**
-	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, StreamOperator, TypeInformation, TypeInformation, String)}
+	 * Is called by the {@link org.apache.flink.streaming.api.graph.StreamGraph#addOperator(Integer, String, StreamOperator, TypeInformation, TypeInformation, String)}
 	 * method when the {@link org.apache.flink.streaming.api.graph.StreamGraph} is generated. The
 	 * method is called with the output {@link TypeInformation} which is also used for the
 	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} output serializer.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 69e920f..9ed715e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -69,7 +69,7 @@ public interface StreamOperator<OUT> extends Serializable {
 
 	 * <p>
 	 * The method is expected to flush all remaining buffered data. Exceptions during this flushing
-	 * of buffered should be propagated, in order to cause the operation to be recognized asa failed,
+	 * of buffered should be propagated, in order to cause the operation to be recognized as failed,
 	 * because the last data items are not processed properly.
 	 * 
 	 * @throws java.lang.Exception An exception in this method causes the operator to fail.

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index c5f983a..6e2e9f9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -141,6 +141,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
 	}
 
+	public Object getCheckpointLock() {
+		return mockTask.getCheckpointLock();
+	}
+
 	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
 		ClosureCleaner.clean(keySelector, false);
 		config.setStatePartitioner(0, keySelector);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7be7840..f6dab1e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.streaming.api.scala
 
 import com.esotericsoftware.kryo.Serializer
-
-import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
+import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -28,14 +27,12 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.AbstractStateBackend
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
-import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source._
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
 import org.apache.flink.util.SplittableIterator
 
 import scala.collection.JavaConverters._
-
 import _root_.scala.language.implicitConversions
 
 @Public
@@ -454,20 +451,67 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
         DataStream[T] =
     asScalaStream(javaEnv.readFile(inputFormat, filePath))
 
-
   /**
-   * Creates a DataStream that contains the contents of file created while
-   * system watches the given path. The file will be read with the system's
-   * default character set. The user can check the monitoring interval in milliseconds,
-   * and the way file modifications are handled. By default it checks for only new files
-   * every 100 milliseconds.
-   *
-   */
-  def readFileStream(StreamPath: String, intervalMillis: Long = 100, 
-                     watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] =
+    * Creates a DataStream that contains the contents of file created while
+    * system watches the given path. The file will be read with the system's
+    * default character set. The user can check the monitoring interval in milliseconds,
+    * and the way file modifications are handled. By default it checks for only new files
+    * every 100 milliseconds.
+    *
+    */
+  @Deprecated
+  def readFileStream(StreamPath: String, intervalMillis: Long = 100,
+                     watchType: FileMonitoringFunction.WatchType =
+                     FileMonitoringFunction.WatchType.ONLY_NEW_FILES): DataStream[String] =
     asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType))
 
   /**
+    * Reads the contents of the user-specified path based on the given [[FileInputFormat]].
+    * Depending on the provided [[FileProcessingMode]], the source
+    * may periodically monitor (every `interval` ms) the path for new data
+    * ([[FileProcessingMode.PROCESS_CONTINUOUSLY]]), or process
+    * once the data currently in the path and exit
+    * ([[FileProcessingMode.PROCESS_ONCE]]). In addition,
+    * if the path contains files not to be processed, the user can specify a custom
+    * [[FilePathFilter]]. As a default implementation you can use
+    * [[FilePathFilter.createDefaultFilter()]].
+    *
+    * ** NOTES ON CHECKPOINTING: ** If the `watchType` is set to
+    * [[FileProcessingMode#PROCESS_ONCE]], the source monitors the path ** once **,
+    * creates the [[org.apache.flink.core.fs.FileInputSplit FileInputSplits]]
+    * to be processed, forwards them to the downstream
+    * [[ContinuousFileReaderOperator readers]] to read the actual data,
+    * and exits, without waiting for the readers to finish reading. This
+    * implies that no more checkpoint barriers are going to be forwarded
+    * after the source exits, thus having no checkpoints after that point.
+    *
+    * @param inputFormat
+    *          The input format used to create the data stream
+    * @param filePath
+    *          The path of the file, as a URI (e.g., "file:///some/local/file" or
+    *          "hdfs://host:port/file/path")
+    * @param watchType
+    *          The mode in which the source should operate, i.e. monitor path and react
+    *          to new data, or process once and exit
+    * @param interval
+    *          In the case of periodic path monitoring, this specifies the interval (in millis)
+    *          between consecutive path scans
+    * @param filter
+    *          The files to be excluded from the processing
+    * @return The data stream that represents the data read from the given file
+    */
+  @PublicEvolving
+  def readFile[T: TypeInformation](
+      inputFormat: FileInputFormat[T],
+      filePath: String,
+      watchType: FileProcessingMode,
+      interval: Long,
+      filter: FilePathFilter): DataStream[T] = {
+    val typeInfo = implicitly[TypeInformation[T]]
+    asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, filter, typeInfo))
+  }
+
+  /**
    * Creates a new DataStream that contains the strings received infinitely
    * from socket. Received strings are decoded by the system's default
    * character set. The maximum retry interval is specified in seconds, in case

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
new file mode 100644
index 0000000..4c0f648
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
+
+	private static final int NO_OF_FILES = 9;
+	private static final int LINES_PER_FILE = 200;
+	private static final int NO_OF_RETRIES = 3;
+	private static final int PARALLELISM = 4;
+	private static final long INTERVAL = 2000;
+
+	private static File baseDir;
+	private static org.apache.hadoop.fs.FileSystem fs;
+	private static String localFsURI;
+	private FileCreator fc;
+
+	private static  Map<Integer, List<String>> finalCollectedContent = new HashMap<>();
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+
+			localFsURI = "file:///" + baseDir +"/";
+			fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	@Override
+	public void testProgram(StreamExecutionEnvironment env) {
+
+		// set the restart strategy.
+		env.getConfig().setRestartStrategy(
+			RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
+		env.enableCheckpointing(20);
+		env.setParallelism(PARALLELISM);
+
+		// create and start the file creating thread.
+		fc = new FileCreator();
+		fc.start();
+
+		// create the monitoring source along with the necessary readers.
+		TestingSinkFunction sink = new TestingSinkFunction();
+		TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
+		DataStream<String> inputStream = env.readFile(format, localFsURI,
+			FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL, FilePathFilter.createDefaultFilter());
+
+		inputStream.flatMap(new FlatMapFunction<String, String>() {
+			@Override
+			public void flatMap(String value, Collector<String> out) throws Exception {
+				out.collect(value);
+			}
+		}).addSink(sink).setParallelism(1);
+	}
+
+	@Override
+	public void postSubmit() throws Exception {
+		Map<Integer, List<String>> collected = finalCollectedContent;
+		Assert.assertEquals(collected.size(), fc.getFileContent().size());
+		for (Integer fileIdx: fc.getFileContent().keySet()) {
+			Assert.assertTrue(collected.keySet().contains(fileIdx));
+
+			List<String> cntnt = collected.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(fc.getFileContent().get(fileIdx), cntntStr.toString());
+		}
+
+		collected.clear();
+		finalCollectedContent.clear();
+		fc.clean();
+	}
+
+	private int getLineNo(String line) {
+		String[] tkns = line.split("\\s");
+		Assert.assertTrue(tkns.length == 6);
+		return Integer.parseInt(tkns[tkns.length - 1]);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Functions
+	// --------------------------------------------------------------------------------------------
+
+	// -------------------------			FILE CREATION			-------------------------------
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator extends Thread {
+
+		private final Set<Path> filesCreated = new HashSet<>();
+		private final Map<Integer, String> fileContents = new HashMap<>();
+
+		public void run() {
+			try {
+				for(int i = 0; i < NO_OF_FILES; i++) {
+					Tuple2<org.apache.hadoop.fs.Path, String> file =
+						fillWithData(localFsURI, "file", i, "This is test line.");
+					filesCreated.add(file.f0);
+					fileContents.put(i, file.f1);
+
+					Thread.sleep((int) (INTERVAL / (3.0/2)));
+				}
+			} catch (IOException | InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+
+		void clean() throws IOException {
+			assert (fs != null);
+			for (org.apache.hadoop.fs.Path path: filesCreated) {
+				fs.delete(path, false);
+			}
+			fileContents.clear();
+		}
+
+		Map<Integer, String> getFileContent() {
+			return this.fileContents;
+		}
+	}
+
+	/**
+	 * Fill the file with content and put the content in the {@code hdPathContents} list.
+	 * */
+	private Tuple2<Path, String> fillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+		assert (fs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = fs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for(int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+		fs.rename(tmp, file);
+		Assert.assertTrue("No result file present", fs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+
+	// --------------------------			Task Sink			------------------------------
+
+	private static class TestingSinkFunction extends RichSinkFunction<String>
+		implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener {
+
+		private static volatile boolean hasFailed = false;
+
+		private volatile int numSuccessfulCheckpoints;
+
+		private long count;
+
+		private long elementsToFailure;
+
+		private long elementCounter = 0;
+
+		private  Map<Integer, Set<String>> collectedContent = new HashMap<>();
+
+		TestingSinkFunction() {
+			hasFailed = false;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+			long failurePosMin = (long) (0.4 * LINES_PER_FILE);
+			long failurePosMax = (long) (0.7 * LINES_PER_FILE);
+
+			elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+
+			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+				finalCollectedContent = new HashMap<>();
+				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
+					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+				}
+				throw new SuccessException();
+			}
+		}
+
+		@Override
+		public void close() {
+			try {
+				super.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public void invoke(String value) throws Exception {
+			int fileIdx = Character.getNumericValue(value.charAt(0));
+
+			Set<String> content = collectedContent.get(fileIdx);
+			if (content == null) {
+				content = new HashSet<>();
+				collectedContent.put(fileIdx, content);
+			}
+
+			if (!content.add(value + "\n")) {
+				fail("Duplicate line: " + value);
+				System.exit(0);
+			}
+
+
+			elementCounter++;
+			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
+				finalCollectedContent = new HashMap<>();
+				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
+					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
+				}
+				throw new SuccessException();
+			}
+
+			count++;
+			if (!hasFailed) {
+				Thread.sleep(2);
+				if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) {
+					hasFailed = true;
+					throw new Exception("Task Failure");
+				}
+			}
+		}
+
+		@Override
+		public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return new Tuple2<>(elementCounter, collectedContent);
+		}
+
+		@Override
+		public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception {
+			this.elementCounter = state.f0;
+			this.collectedContent = state.f1;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			numSuccessfulCheckpoints++;
+		}
+	}
+}