You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/14 16:12:37 UTC

[3/4] flink git commit: [FLINK-2314] Make Streaming File Sources Persistent

[FLINK-2314] Make Streaming File Sources Persistent

This commit is a combination of several commits/changes. It combines
changes to the file input formats and the streaming file read operator
and integrates them into the API.

These are the messages of the other two commits:

[FLINK-3717] Make FileInputFormat checkpointable

This adds a new interface called CheckpointableInputFormat
which describes input formats whose state is queryable,
i.e. getCurrentState() returns where the reader is
in the underlying source, and they can resume reading from
a user-specified position.

This functionality is not yet leveraged by current readers.

[FLINK-3889] Refactor File Monitoring Source

This is meant to replace the different file
reading sources in Flink streaming. Now there is
one monitoring source with DOP 1 monitoring a
directory and assigning input split to downstream
readers.

In addition, it makes the new features added by
FLINK-3717 work together with the aforementioned entities
(the monitor and the readers) in order to have
fault tolerant file sources and exactly once guarantees.

This does not replace the old API calls. This
will be done in a future commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d353895b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d353895b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d353895b

Branch: refs/heads/master
Commit: d353895ba512a5e30fb08a25643fd93f085e8456
Parents: bc19486
Author: kl0u <kk...@gmail.com>
Authored: Sun Apr 10 16:56:42 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Jun 14 18:11:22 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/AvroInputFormat.java      |  85 +++-
 .../io/avro/AvroSplittableInputFormatTest.java  |  95 +++-
 .../flink/api/common/io/BinaryInputFormat.java  | 121 +++--
 .../apache/flink/api/common/io/BlockInfo.java   |   5 +
 .../common/io/CheckpointableInputFormat.java    |  61 +++
 .../api/common/io/DelimitedInputFormat.java     | 114 +++--
 .../api/common/io/EnumerateNestedFilesTest.java |   2 +-
 .../api/common/io/FileInputFormatTest.java      |   9 +-
 .../api/common/io/SequentialFormatTestBase.java |  15 +-
 flink-fs-tests/pom.xml                          |  11 +-
 .../ContinuousFileMonitoringFunctionITCase.java | 300 +++++++++++++
 .../hdfstests/ContinuousFileMonitoringTest.java | 447 +++++++++++++++++++
 .../flink/api/java/io/CsvInputFormatTest.java   | 134 +++++-
 .../runtime/taskmanager/RuntimeEnvironment.java |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../api/datastream/DataStreamSource.java        |   5 +
 .../environment/StreamExecutionEnvironment.java | 256 +++++++++--
 .../ContinuousFileMonitoringFunction.java       | 328 ++++++++++++++
 .../source/ContinuousFileReaderOperator.java    | 390 ++++++++++++++++
 .../source/FileMonitoringFunction.java          |   3 +-
 .../api/functions/source/FilePathFilter.java    |  66 +++
 .../functions/source/FileProcessingMode.java    |  31 ++
 .../api/functions/source/FileReadFunction.java  |   3 +-
 .../functions/source/FileSourceFunction.java    | 148 ------
 .../api/functions/source/InputFormatSource.java | 148 ++++++
 .../api/graph/StreamGraphGenerator.java         |   6 +-
 .../api/operators/OutputTypeConfigurable.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +
 .../api/scala/StreamExecutionEnvironment.scala  |  74 ++-
 ...ontinuousFileProcessingCheckpointITCase.java | 327 ++++++++++++++
 .../StreamFaultToleranceTestBase.java           |   8 +-
 32 files changed, 2889 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
index 605ce69..a920275 100644
--- a/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
+++ b/flink-batch-connectors/flink-avro/src/main/java/org/apache/flink/api/java/io/AvroInputFormat.java
@@ -22,13 +22,15 @@ package org.apache.flink.api.java.io;
 import java.io.IOException;
 
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableInput;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.common.io.CheckpointableInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.avro.FSDataInputStreamWrapper;
@@ -42,15 +44,16 @@ import org.apache.flink.util.InstantiationUtil;
 
 /**
  * Provides a {@link FileInputFormat} for Avro records.
- * 
+ *
  * @param <E>
  *            the type of the result Avro record. If you specify
  *            {@link GenericRecord} then the result will be returned as a
  *            {@link GenericRecord}, so you do not have to know the schema ahead
  *            of time.
  */
-public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
-	
+public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E>,
+	CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(AvroInputFormat.class);
@@ -59,16 +62,19 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	
 	private boolean reuseAvroValue = true;
 
-	private transient FileReader<E> dataFileReader;
+	private transient DataFileReader<E> dataFileReader;
 
 	private transient long end;
-	
+
+	private transient long recordsReadSinceLastSync;
+
+	private transient long lastSync = -1l;
+
 	public AvroInputFormat(Path filePath, Class<E> type) {
 		super(filePath);
 		this.avroValueType = type;
 	}
-	
-	
+
 	/**
 	 * Sets the flag whether to reuse the Avro value instance for all records.
 	 * By default, the input format reuses the Avro value.
@@ -102,30 +108,34 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 	@Override
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
+		dataFileReader = initReader(split);
+		dataFileReader.sync(split.getStart());
+		lastSync = dataFileReader.previousSync();
+	}
 
+	private DataFileReader<E> initReader(FileInputSplit split) throws IOException {
 		DatumReader<E> datumReader;
-		
+
 		if (org.apache.avro.generic.GenericRecord.class == avroValueType) {
 			datumReader = new GenericDatumReader<E>();
 		} else {
 			datumReader = org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)
-					? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
+				? new SpecificDatumReader<E>(avroValueType) : new ReflectDatumReader<E>(avroValueType);
 		}
-
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Opening split {}", split);
 		}
 
 		SeekableInput in = new FSDataInputStreamWrapper(stream, split.getPath().getFileSystem().getFileStatus(split.getPath()).getLen());
+		DataFileReader<E> dataFileReader = (DataFileReader) DataFileReader.openReader(in, datumReader);
 
-		dataFileReader = DataFileReader.openReader(in, datumReader);
-		
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Loaded SCHEMA: {}", dataFileReader.getSchema());
 		}
-		
-		dataFileReader.sync(split.getStart());
-		this.end = split.getStart() + split.getLength();
+
+		end = split.getStart() + split.getLength();
+		recordsReadSinceLastSync = 0;
+		return dataFileReader;
 	}
 
 	@Override
@@ -133,11 +143,24 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 		return !dataFileReader.hasNext() || dataFileReader.pastSync(end);
 	}
 
+	public long getRecordsReadFromBlock() {
+		return this.recordsReadSinceLastSync;
+	}
+
 	@Override
 	public E nextRecord(E reuseValue) throws IOException {
 		if (reachedEnd()) {
 			return null;
 		}
+
+		// if we start a new block, then register the event, and
+		// restart the counter.
+		if(dataFileReader.previousSync() != lastSync) {
+			lastSync = dataFileReader.previousSync();
+			recordsReadSinceLastSync = 0;
+		}
+		recordsReadSinceLastSync++;
+
 		if (reuseAvroValue) {
 			return dataFileReader.next(reuseValue);
 		} else {
@@ -148,4 +171,34 @@ public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultType
 			}
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		return new Tuple2<>(this.lastSync, this.recordsReadSinceLastSync);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		if (state.f0 != -1) {
+
+			// go to the block we stopped
+			lastSync = state.f0;
+			dataFileReader.seek(lastSync);
+
+			// read until the record we were before the checkpoint and discard the values
+			long recordsToDiscard = state.f1;
+			for(int i = 0; i < recordsToDiscard; i++) {
+				dataFileReader.next(null);
+				recordsReadSinceLastSync++;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
index 898b8fd..37a83d1 100644
--- a/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ b/flink-batch-connectors/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.io.avro.generated.Colors;
 import org.apache.flink.api.io.avro.generated.Fixed16;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -166,7 +167,7 @@ public class AvroSplittableInputFormatTest {
 		Configuration parameters = new Configuration();
 		
 		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
-		
+
 		format.configure(parameters);
 		FileInputSplit[] splits = format.createInputSplits(4);
 		assertEquals(splits.length, 4);
@@ -191,6 +192,98 @@ public class AvroSplittableInputFormatTest {
 		format.close();
 	}
 
+	@Test
+	public void testAvroRecoveryWithFailureAtStart() throws Exception {
+		final int recordsUntilCheckpoint = 132;
+
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		format.configure(parameters);
+
+		FileInputSplit[] splits = format.createInputSplits(4);
+		assertEquals(splits.length, 4);
+
+		int elements = 0;
+		int elementsPerSplit[] = new int[4];
+		for(int i = 0; i < splits.length; i++) {
+			format.reopen(splits[i], format.getCurrentState());
+			while(!format.reachedEnd()) {
+				User u = format.nextRecord(null);
+				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+				elements++;
+
+				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+
+					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
+					Tuple2<Long, Long> state = format.getCurrentState();
+
+					// this is to make sure that nothing stays from the previous format
+					// (as it is going to be in the normal case)
+					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
+
+					format.reopen(splits[i], state);
+					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+				}
+				elementsPerSplit[i]++;
+			}
+			format.close();
+		}
+
+		Assert.assertEquals(1539, elementsPerSplit[0]);
+		Assert.assertEquals(1026, elementsPerSplit[1]);
+		Assert.assertEquals(1539, elementsPerSplit[2]);
+		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(NUM_RECORDS, elements);
+		format.close();
+	}
+
+	@Test
+	public void testAvroRecovery() throws Exception {
+		final int recordsUntilCheckpoint = 132;
+
+		Configuration parameters = new Configuration();
+
+		AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class);
+		format.configure(parameters);
+
+		FileInputSplit[] splits = format.createInputSplits(4);
+		assertEquals(splits.length, 4);
+
+		int elements = 0;
+		int elementsPerSplit[] = new int[4];
+		for(int i = 0; i < splits.length; i++) {
+			format.open(splits[i]);
+			while(!format.reachedEnd()) {
+				User u = format.nextRecord(null);
+				Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
+				elements++;
+
+				if(format.getRecordsReadFromBlock() == recordsUntilCheckpoint) {
+
+					// do the whole checkpoint-restore procedure and see if we pick up from where we left off.
+					Tuple2<Long, Long> state = format.getCurrentState();
+
+					// this is to make sure that nothing stays from the previous format
+					// (as it is going to be in the normal case)
+					format = new AvroInputFormat<>(new Path(testFile.getAbsolutePath()), User.class);
+
+					format.reopen(splits[i], state);
+					assertEquals(format.getRecordsReadFromBlock(), recordsUntilCheckpoint);
+				}
+				elementsPerSplit[i]++;
+			}
+			format.close();
+		}
+
+		Assert.assertEquals(1539, elementsPerSplit[0]);
+		Assert.assertEquals(1026, elementsPerSplit[1]);
+		Assert.assertEquals(1539, elementsPerSplit[2]);
+		Assert.assertEquals(896, elementsPerSplit[3]);
+		Assert.assertEquals(NUM_RECORDS, elements);
+		format.close();
+	}
+
 	/*
 	This test is gave the reference values for the test of Flink's IF.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
index 3789544..eb83bda 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -30,6 +31,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,36 +42,44 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks. Without
- * configuration, these block sizes equal the native block sizes of the HDFS.
+ * Base class for all input formats that use blocks of fixed size. The input splits are aligned to these blocks,
+ * meaning that each split will consist of one block. Without configuration, these block sizes equal the native
+ * block sizes of the HDFS.
+ *
+ * A block will contain a {@link BlockInfo} at the end of the block. There, the reader can find some statistics
+ * about the split currently being read, that will help correctly parse the contents of the block.
  */
 @Public
-public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
+public abstract class BinaryInputFormat<T> extends FileInputFormat<T>
+	implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {
+
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The log.
-	 */
+	/** The log. */
 	private static final Logger LOG = LoggerFactory.getLogger(BinaryInputFormat.class);
 
-	/**
-	 * The config parameter which defines the fixed length of a record.
-	 */
+	/** The config parameter which defines the fixed length of a record. */
 	public static final String BLOCK_SIZE_PARAMETER_KEY = "input.block_size";
 
 	public static final long NATIVE_BLOCK_SIZE = Long.MIN_VALUE;
 
-	/**
-	 * The block size to use.
-	 */
+	/** The block size to use. */
 	private long blockSize = NATIVE_BLOCK_SIZE;
 
 	private transient DataInputViewStreamWrapper dataInputStream;
 
+	/** The BlockInfo for the Block corresponding to the split currently being read. */
 	private transient BlockInfo blockInfo;
 
-	private long readRecords;
+	/** A wrapper around the block currently being read. */
+	private transient BlockBasedInput blockBasedInput = null;
 
+	/**
+	 * The number of records already read from the block.
+	 * This is used to decide if the end of the block has been
+	 * reached.
+	 */
+	private long readRecords = 0;
 
 	@Override
 	public void configure(Configuration parameters) {
@@ -193,6 +203,20 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 		return new BlockInfo();
 	}
 
+	private BlockInfo createAndReadBlockInfo() throws IOException {
+		BlockInfo blockInfo = new BlockInfo();
+		if (this.splitLength > blockInfo.getInfoSize()) {
+			// At first we go and read  the block info containing the recordCount, the accumulatedRecordCount
+			// and the firstRecordStart offset in the current block. This is written at the end of the block and
+			// is of fixed size, currently 3 * Long.SIZE.
+
+			// TODO: seek not supported by compressed streams. Will throw exception
+			this.stream.seek(this.splitStart + this.splitLength - blockInfo.getInfoSize());
+			blockInfo.read(new DataInputViewStreamWrapper(this.stream));
+		}
+		return blockInfo;
+	}
+
 	/**
 	 * Fill in the statistics. The last modification time and the total input size are prefilled.
 	 *
@@ -207,8 +231,7 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return null;
 		}
 
-		BlockInfo blockInfo = this.createBlockInfo();
-
+		BlockInfo blockInfo = new BlockInfo();
 		long totalCount = 0;
 		for (FileStatus file : files) {
 			// invalid file
@@ -249,20 +272,16 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
 
-		final long blockSize = this.blockSize == NATIVE_BLOCK_SIZE ?
-			this.filePath.getFileSystem().getDefaultBlockSize() : this.blockSize;
-
-		this.blockInfo = this.createBlockInfo();
-		if (this.splitLength > this.blockInfo.getInfoSize()) {
-			// TODO: seek not supported by compressed streams. Will throw exception
-			this.stream.seek(this.splitStart + this.splitLength - this.blockInfo.getInfoSize());
-			this.blockInfo.read(new DataInputViewStreamWrapper(this.stream));
-		}
+		this.blockInfo = this.createAndReadBlockInfo();
 
+		// We set the size of the BlockBasedInput to splitLength as each split contains one block.
+		// After reading the block info, we seek in the file to the correct position.
+		
+		this.readRecords = 0;
 		this.stream.seek(this.splitStart + this.blockInfo.getFirstRecordStart());
-		BlockBasedInput blockBasedInput = new BlockBasedInput(this.stream, (int) blockSize);
+		this.blockBasedInput = new BlockBasedInput(this.stream,
+			(int) blockInfo.getFirstRecordStart(), this.splitLength);
 		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
-		this.readRecords = 0;
 	}
 
 	@Override
@@ -275,7 +294,6 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 		if (this.reachedEnd()) {
 			return null;
 		}
-
 		record = this.deserialize(record, this.dataInputStream);
 		this.readRecords++;
 		return record;
@@ -284,8 +302,8 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 	protected abstract T deserialize(T reuse, DataInputView dataInput) throws IOException;
 
 	/**
-	 * Writes a block info at the end of the blocks.<br>
-	 * Current implementation uses only int and not long.
+	 * Reads the content of a block of data. The block contains its {@link BlockInfo}
+	 * at the end, and this method takes this into account when reading the data.
 	 */
 	protected class BlockBasedInput extends FilterInputStream {
 		private final int maxPayloadSize;
@@ -297,6 +315,12 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			this.blockPos = (int) BinaryInputFormat.this.blockInfo.getFirstRecordStart();
 			this.maxPayloadSize = blockSize - BinaryInputFormat.this.blockInfo.getInfoSize();
 		}
+		
+		public BlockBasedInput(FSDataInputStream in, int startPos, long length) {
+			super(in);
+			this.blockPos = startPos;
+			this.maxPayloadSize = (int) (length - BinaryInputFormat.this.blockInfo.getInfoSize());
+		}
 
 		@Override
 		public int read() throws IOException {
@@ -306,9 +330,16 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return this.in.read();
 		}
 
+		private long getCurrBlockPos() {
+			return this.blockPos;
+		}
+
 		private void skipHeader() throws IOException {
 			byte[] dummy = new byte[BinaryInputFormat.this.blockInfo.getInfoSize()];
 			this.in.read(dummy, 0, dummy.length);
+
+			// the blockPos is set to 0 for the case of remote reads,
+			// these are the cases where the last record of a block spills on the next block
 			this.blockPos = 0;
 		}
 
@@ -337,4 +368,36 @@ public abstract class BinaryInputFormat<T> extends FileInputFormat<T> {
 			return totalRead;
 		}
 	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Tuple2<Long, Long> getCurrentState() throws IOException {
+		if (this.blockBasedInput == null) {
+			throw new RuntimeException("You must have forgotten to call open() on your input format.");
+		}
+
+		return  new Tuple2<>(
+			this.blockBasedInput.getCurrBlockPos(), 		// the last read index in the block
+			this.readRecords								// the number of records read
+		);
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		this.blockInfo = this.createAndReadBlockInfo();
+
+		long blockPos = state.f0;
+		this.readRecords = state.f1;
+
+		this.stream.seek(this.splitStart + blockPos);
+		this.blockBasedInput = new BlockBasedInput(this.stream, (int) blockPos, this.splitLength);
+		this.dataInputStream = new DataInputViewStreamWrapper(blockBasedInput);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
index 0ac2e50..2cb18ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BlockInfo.java
@@ -25,6 +25,11 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
+/**
+ * A block of 24 bytes written at the <i>end</i> of a block in a binary file, and containing
+ * i) the number of records in the block, ii) the accumulated number of records, and
+ * iii) the offset of the first record in the block.
+ * */
 @Public
 public class BlockInfo implements IOReadableWritable {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
new file mode 100644
index 0000000..17b0625
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/CheckpointableInputFormat.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.core.io.InputSplit;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An interface the describes {@link InputFormat}s that allow checkpointing/restoring their state.
+ *
+ * @param <S> The type of input split.
+ * @param <T> The type of the channel state to be checkpointed / included in the snapshot.
+ */
+@PublicEvolving
+public interface CheckpointableInputFormat<S extends InputSplit, T extends Serializable> {
+
+	/**
+	 * Returns the split currently being read, along with its current state.
+	 * This will be used to restore the state of the reading channel when recovering from a task failure.
+	 * In the case of a simple text file, the state can correspond to the last read offset in the split.
+	 *
+	 * @return The state of the channel.
+	 *
+	 * @throws Exception Thrown if the creation of the state object failed.
+	 */
+	T getCurrentState() throws IOException;
+
+	/**
+	 * Restores the state of a parallel instance reading from an {@link InputFormat}.
+	 * This is necessary when recovering from a task failure. When this method is called,
+	 * the input format it guaranteed to be configured.
+	 *
+	 * <p/>
+	 * <b>NOTE: </b> The caller has to make sure that the provided split is the one to whom
+	 * the state belongs.
+	 *
+	 * @param split The split to be opened.
+	 * @param state The state from which to start from. This can contain the offset,
+	 *                 but also other data, depending on the input format.
+	 */
+	void reopen(S split, T state) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 243e2a4..3a77200 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -19,6 +19,9 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,9 +31,6 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ import java.util.ArrayList;
  * <p>The default delimiter is the newline character {@code '\n'}.</p>
  */
 @Public
-public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
+public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> implements CheckpointableInputFormat<FileInputSplit, Long> {
 	
 	private static final long serialVersionUID = 1L;
 
@@ -56,7 +56,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 
 	/** The default charset  to convert strings to bytes */
 	private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
-	
+
 	/**
 	 * The default read buffer size = 1MB.
 	 */
@@ -81,7 +81,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	 * The maximum size of a sample record before sampling is aborted. To catch cases where a wrong delimiter is given.
 	 */
 	private static int MAX_SAMPLE_LEN;
-	
+
 	static { loadGlobalConfigParams(); }
 	
 	protected static void loadGlobalConfigParams() {
@@ -135,7 +135,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	private transient int readPos;
 
 	private transient int limit;
-	
+
 	private transient byte[] currBuffer;		// buffer in which current record byte sequence is found
 	private transient int currOffset;			// offset in above buffer
 	private transient int currLen;				// length of current byte sequence
@@ -143,8 +143,9 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	private transient boolean overLimit;
 
 	private transient boolean end;
-	
-	
+
+	private transient long offset = -1;
+
 	// --------------------------------------------------------------------------------------------
 	//  The configuration parameters. Configured on the instance and serialized to be shipped.
 	// --------------------------------------------------------------------------------------------
@@ -182,7 +183,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		
 		this.delimiter = delimiter;
 	}
-	
+
 	public void setDelimiter(char delimiter) {
 		setDelimiter(String.valueOf(delimiter));
 	}
@@ -226,7 +227,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		
 		this.numLineSamples = numLineSamples;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  User-defined behavior
 	// --------------------------------------------------------------------------------------------
@@ -404,17 +405,33 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	/**
 	 * Opens the given input split. This method opens the input stream to the specified file, allocates read buffers
 	 * and positions the stream at the correct position, making sure that any partial record at the beginning is skipped.
-	 * 
+	 *
 	 * @param split The input split to open.
-	 * 
+	 *
 	 * @see org.apache.flink.api.common.io.FileInputFormat#open(org.apache.flink.core.fs.FileInputSplit)
 	 */
 	@Override
 	public void open(FileInputSplit split) throws IOException {
 		super.open(split);
-		
+		initBuffers();
+
+		this.offset = splitStart;
+		if (this.splitStart != 0) {
+			this.stream.seek(offset);
+			readLine();
+			// if the first partial record already pushes the stream over
+			// the limit of our split, then no record starts within this split
+			if (this.overLimit) {
+				this.end = true;
+			}
+		} else {
+			fillBuffer();
+		}
+	}
+
+	private void initBuffers() {
 		this.bufferSize = this.bufferSize <= 0 ? DEFAULT_READ_BUFFER_SIZE : this.bufferSize;
-		
+
 		if (this.readBuffer == null || this.readBuffer.length != this.bufferSize) {
 			this.readBuffer = new byte[this.bufferSize];
 		}
@@ -426,19 +443,6 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 		this.limit = 0;
 		this.overLimit = false;
 		this.end = false;
-
-		if (this.splitStart != 0) {
-			this.stream.seek(this.splitStart);
-			readLine();
-			
-			// if the first partial record already pushes the stream over the limit of our split, then no
-			// record starts within this split 
-			if (this.overLimit) {
-				this.end = true;
-			}
-		} else {
-			fillBuffer();
-		}
 	}
 
 	/**
@@ -489,6 +493,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 			if (this.readPos >= this.limit) {
 				if (!fillBuffer()) {
 					if (countInWrapBuffer > 0) {
+						this.offset += countInWrapBuffer;
 						setResult(this.wrapBuffer, 0, countInWrapBuffer);
 						return true;
 					} else {
@@ -506,13 +511,14 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 				} else {
 					i = 0;
 				}
-
 			}
 
 			// check why we dropped out
 			if (i == this.delimiter.length) {
 				// line end
-				count = this.readPos - startPos - this.delimiter.length;
+				int totalBytesRead = this.readPos - startPos;
+				this.offset += countInWrapBuffer + totalBytesRead;
+				count = totalBytesRead - this.delimiter.length;
 
 				// copy to byte array
 				if (countInWrapBuffer > 0) {
@@ -535,7 +541,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 				count = this.limit - startPos;
 				
 				// check against the maximum record length
-				if ( ((long) countInWrapBuffer) + count > this.lineLengthLimit) {
+				if (((long) countInWrapBuffer) + count > this.lineLengthLimit) {
 					throw new IOException("The record length exceeded the maximum record length (" + 
 							this.lineLengthLimit + ").");
 				}
@@ -604,13 +610,11 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 			return true;
 		}
 	}
-	
-	// ============================================================================================
-	//  Parametrization via configuration
-	// ============================================================================================
-	
-	// ------------------------------------- Config Keys ------------------------------------------
-	
+
+	// --------------------------------------------------------------------------------------------
+	// Config Keys for Parametrization via configuration
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * The configuration key to set the record delimiter.
 	 */
@@ -620,4 +624,38 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> {
 	 * The configuration key to set the number of samples to take for the statistics.
 	 */
 	private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples";
+
+	// --------------------------------------------------------------------------------------------
+	//  Checkpointing
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Long getCurrentState() throws IOException {
+		return this.offset;
+	}
+
+	@Override
+	public void reopen(FileInputSplit split, Long state) throws IOException {
+		Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
+		Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
+
+		this.open(split);
+		this.offset = state;
+		if (state > this.splitStart + split.getLength()) {
+			this.end = true;
+		} else if (state > split.getStart()) {
+			initBuffers();
+
+			this.stream.seek(this.offset);
+			if (split.getLength() == -1) {
+				// this is the case for unsplittable files
+				fillBuffer();
+			} else {
+				this.splitLength = this.splitStart + split.getLength() - this.offset;
+				if (splitLength <= 0) {
+					this.end = true;
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index d0caa22..68465a3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -394,4 +394,4 @@ public class EnumerateNestedFilesTest {
 			return null;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 63cb966..ae8802b 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -377,7 +377,7 @@ public class FileInputFormatTest {
 		final int numBlocks = 3;
 		FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
 		for (int i = 0; i < blockSize * numBlocks; i++) {
-			fileOutputStream.write(new byte[]{1});
+			fileOutputStream.write(new byte[]{(byte) i});
 		}
 		fileOutputStream.close();
 
@@ -392,11 +392,12 @@ public class FileInputFormatTest {
 		FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
 
 		byte[] bytes = null;
+		byte prev = 0;
 		for (FileInputSplit inputSplit : inputSplits) {
 			inputFormat.open(inputSplit);
 			while (!inputFormat.reachedEnd()) {
 				if ((bytes = inputFormat.nextRecord(bytes)) != null) {
-					Assert.assertArrayEquals(new byte[]{(byte) 0xFE}, bytes);
+					Assert.assertArrayEquals(new byte[]{--prev}, bytes);
 				}
 			}
 		}
@@ -420,14 +421,13 @@ public class FileInputFormatTest {
 		}
 	}
 
-
 	private static final class MyDecoratedInputFormat extends FileInputFormat<byte[]> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public boolean reachedEnd() throws IOException {
-			return this.splitLength <= this.stream.getPos();
+			return this.stream.getPos() >= this.splitStart + this.splitLength;
 		}
 
 		@Override
@@ -442,7 +442,6 @@ public class FileInputFormatTest {
 			inputStream = super.decorateInputStream(inputStream, fileSplit);
 			return new InputStreamFSInputWrapper(new InvertedInputStream(inputStream));
 		}
-
 	}
 
 	private static final class InvertedInputStream extends InputStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
index 2ff6fab..b00ca95 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -127,17 +128,29 @@ public abstract class SequentialFormatTestBase<T> extends TestLogger {
 	 * Tests if the expected sequence and amount of data can be read
 	 */
 	@Test
-	public void checkRead() throws IOException {
+	public void checkRead() throws Exception {
 		BinaryInputFormat<T> input = this.createInputFormat();
 		FileInputSplit[] inputSplits = input.createInputSplits(0);
 		Arrays.sort(inputSplits, new InputSplitSorter());
+
 		int readCount = 0;
+
 		for (FileInputSplit inputSplit : inputSplits) {
 			input.open(inputSplit);
+			input.reopen(inputSplit, input.getCurrentState());
+
 			T record = createInstance();
+
 			while (!input.reachedEnd()) {
 				if (input.nextRecord(record) != null) {
 					this.checkEquals(this.getRecord(readCount), record);
+
+					if (!input.reachedEnd()) {
+						Tuple2<Long, Long> state = input.getCurrentState();
+
+						input = this.createInputFormat();
+						input.reopen(inputSplit, state);
+					}
 					readCount++;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-fs-tests/pom.xml b/flink-fs-tests/pom.xml
index 2c9500f..53cb503 100644
--- a/flink-fs-tests/pom.xml
+++ b/flink-fs-tests/pom.xml
@@ -78,7 +78,15 @@ under the License.
 			<type>test-jar</type>
 			<scope>test</scope>
 		</dependency>
-		
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.hadoop</groupId>
 			<artifactId>hadoop-hdfs</artifactId>
@@ -94,5 +102,6 @@ under the License.
 			<type>test-jar</type>
 			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
 		</dependency>
+
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
new file mode 100644
index 0000000..e6cd5d9
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringFunctionITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ContinuousFileMonitoringFunctionITCase extends StreamingProgramTestBase {
+
+	private static final int NO_OF_FILES = 10;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private File baseDir;
+
+	private org.apache.hadoop.fs.FileSystem hdfs;
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+
+	private static Map<Integer, String> expectedContents = new HashMap<>();
+
+	//						PREPARING FOR THE TESTS
+
+	@Before
+	public void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	@Override
+	protected void testProgram() throws Exception {
+
+		/*
+		* This test checks the interplay between the monitor and the reader
+		* and also the failExternally() functionality. To test the latter we
+		* set the parallelism to 1 so that we have the chaining between the sink,
+		* which throws the SuccessException to signal the end of the test, and the
+		* reader.
+		* */
+
+		FileCreator fileCreator = new FileCreator(INTERVAL);
+		Thread t = new Thread(fileCreator);
+		t.start();
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		format.setFilePath(hdfsURI);
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(1);
+
+			ContinuousFileMonitoringFunction<String> monitoringFunction =
+				new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+					FilePathFilter.createDefaultFilter(),
+					FileProcessingMode.PROCESS_CONTINUOUSLY,
+					env.getParallelism(), INTERVAL);
+
+			TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+			ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+			TestingSinkFunction sink = new TestingSinkFunction(monitoringFunction);
+
+			DataStream<FileInputSplit> splits = env.addSource(monitoringFunction);
+			splits.transform("FileSplitReader", typeInfo, reader).addSink(sink).setParallelism(1);
+			env.execute();
+
+		} catch (Exception e) {
+			Throwable th = e;
+			int depth = 0;
+
+			for (; depth < 20; depth++) {
+				if (th instanceof SuccessException) {
+					try {
+						postSubmit();
+					} catch (Exception e1) {
+						e1.printStackTrace();
+					}
+					return;
+				} else if (th.getCause() != null) {
+					th = th.getCause();
+				} else {
+					break;
+				}
+			}
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	private static class TestingSinkFunction extends RichSinkFunction<String> {
+
+		private final ContinuousFileMonitoringFunction src;
+
+		private int elementCounter = 0;
+		private Map<Integer, Integer> elementCounters = new HashMap<>();
+		private Map<Integer, List<String>> collectedContent = new HashMap<>();
+
+		TestingSinkFunction(ContinuousFileMonitoringFunction monitoringFunction) {
+			this.src = monitoringFunction;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() {
+			// check if the data that we collected are the ones they are supposed to be.
+
+			Assert.assertEquals(collectedContent.size(), expectedContents.size());
+			for (Integer fileIdx: expectedContents.keySet()) {
+				Assert.assertTrue(collectedContent.keySet().contains(fileIdx));
+
+				List<String> cntnt = collectedContent.get(fileIdx);
+				Collections.sort(cntnt, new Comparator<String>() {
+					@Override
+					public int compare(String o1, String o2) {
+						return getLineNo(o1) - getLineNo(o2);
+					}
+				});
+
+				StringBuilder cntntStr = new StringBuilder();
+				for (String line: cntnt) {
+					cntntStr.append(line);
+				}
+				Assert.assertEquals(cntntStr.toString(), expectedContents.get(fileIdx));
+			}
+			expectedContents.clear();
+
+			src.cancel();
+			try {
+				src.close();
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		private int getLineNo(String line) {
+			String[] tkns = line.split("\\s");
+			Assert.assertTrue(tkns.length == 6);
+			return Integer.parseInt(tkns[tkns.length - 1]);
+		}
+
+		@Override
+		public void invoke(String value) throws Exception {
+			int fileIdx = Character.getNumericValue(value.charAt(0));
+
+			Integer counter = elementCounters.get(fileIdx);
+			if (counter == null) {
+				counter = 0;
+			} else if (counter == LINES_PER_FILE) {
+				// ignore duplicate lines.
+				Assert.fail("Duplicate lines detected.");
+			}
+			elementCounters.put(fileIdx, ++counter);
+
+			List<String> content = collectedContent.get(fileIdx);
+			if (content == null) {
+				content = new ArrayList<>();
+				collectedContent.put(fileIdx, content);
+			}
+			content.add(value + "\n");
+
+			elementCounter++;
+			if (elementCounter == NO_OF_FILES * LINES_PER_FILE) {
+				throw new SuccessException();
+			}
+		}
+	}
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator implements Runnable {
+
+		private final long interval;
+
+		FileCreator(long interval) {
+			this.interval = interval;
+		}
+
+		public void run() {
+			try {
+				for (int i = 0; i < NO_OF_FILES; i++) {
+					fillWithData(hdfsURI, "file", i, "This is test line.");
+					Thread.sleep(interval);
+				}
+			} catch (IOException e) {
+				e.printStackTrace();
+			} catch (InterruptedException e) {
+				// we just close without any message.
+			}
+		}
+	}
+
+	/**
+	 * Fill the file with content.
+	 * */
+	private void fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		expectedContents.put(fileIdx, str.toString());
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+	}
+
+	public static class SuccessException extends Exception {
+		private static final long serialVersionUID = -7011865671593955887L;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
new file mode 100644
index 0000000..87567e3
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.FilePathFilter;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+public class ContinuousFileMonitoringTest {
+
+	private static final int NO_OF_FILES = 10;
+	private static final int LINES_PER_FILE = 10;
+
+	private static final long INTERVAL = 100;
+
+	private static File baseDir;
+
+	private static org.apache.hadoop.fs.FileSystem hdfs;
+	private static String hdfsURI;
+	private static MiniDFSCluster hdfsCluster;
+
+	//						PREPARING FOR THE TESTS
+
+	@BeforeClass
+	public static void createHDFS() {
+		try {
+			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+
+			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
+
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void destroyHDFS() {
+		try {
+			FileUtil.fullyDelete(baseDir);
+			hdfsCluster.shutdown();
+		} catch (Throwable t) {
+			throw new RuntimeException(t);
+		}
+	}
+
+	//						END OF PREPARATIONS
+
+	//						TESTS
+
+	@Test
+	public void testFileReadingOperator() throws Exception {
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+		Map<Integer, String> expectedFileContents = new HashMap<>();
+		for(int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+			expectedFileContents.put(i, file.f1);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
+		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
+			new OneInputStreamOperatorTestHarness<>(reader);
+
+		reader.setOutputType(typeInfo, new ExecutionConfig());
+		tester.open();
+
+		// create the necessary splits for the test
+		FileInputSplit[] splits = format.createInputSplits(
+			reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+		// and feed them to the operator
+		for(FileInputSplit split: splits) {
+			tester.processElement(new StreamRecord<>(split));
+		}
+
+		// then close the reader gracefully
+		synchronized (tester.getCheckpointLock()) {
+			tester.close();
+		}
+
+		/*
+		* Given that the reader is multithreaded, the test finishes before the reader thread finishes
+		* reading. This results in files being deleted by the test before being read, thus throwing an exception.
+		* In addition, even if file deletion happens at the end, the results are not ready for testing.
+		* To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s.
+		*/
+
+		long start = System.currentTimeMillis();
+		Queue<Object> output;
+		do {
+			output = tester.getOutput();
+			Thread.sleep(50);
+		} while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
+
+		Map<Integer, List<String>> actualFileContents = new HashMap<>();
+		for(Object line: tester.getOutput()) {
+			StreamRecord<String> element = (StreamRecord<String>) line;
+
+			int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
+			List<String> content = actualFileContents.get(fileIdx);
+			if(content == null) {
+				content = new ArrayList<>();
+				actualFileContents.put(fileIdx, content);
+			}
+			content.add(element.getValue() +"\n");
+		}
+
+		Assert.assertEquals(actualFileContents.size(), expectedFileContents.size());
+		for (Integer fileIdx: expectedFileContents.keySet()) {
+			Assert.assertTrue(actualFileContents.keySet().contains(fileIdx));
+
+			List<String> cntnt = actualFileContents.get(fileIdx);
+			Collections.sort(cntnt, new Comparator<String>() {
+				@Override
+				public int compare(String o1, String o2) {
+					return getLineNo(o1) - getLineNo(o2);
+				}
+			});
+
+			StringBuilder cntntStr = new StringBuilder();
+			for (String line: cntnt) {
+				cntntStr.append(line);
+			}
+			Assert.assertEquals(cntntStr.toString(), expectedFileContents.get(fileIdx));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	private static class PathFilter extends FilePathFilter {
+
+		@Override
+		public boolean filterPath(Path filePath) {
+			return filePath.getName().startsWith("**");
+		}
+	}
+
+	@Test
+	public void testFilePathFiltering() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+
+		// create the files to be discarded
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "**file", i, "This is test line.");
+			filesCreated.add(file.f0);
+		}
+
+		// create the files to be kept
+		for (int i = 0; i < NO_OF_FILES; i++) {
+			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
+			filesCreated.add(file.f0);
+		}
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, new PathFilter(),
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+		for(int i = 0; i < NO_OF_FILES; i++) {
+			org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(hdfsURI + "/file" + i);
+			Assert.assertTrue(uniqFilesFound.contains(file.toString()));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	@Test
+	public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+
+		FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
+		fc.start();
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		// wait until the sink also sees all the splits.
+		synchronized (uniqFilesFound) {
+			while (uniqFilesFound.size() < NO_OF_FILES) {
+				uniqFilesFound.wait(7 * INTERVAL);
+			}
+		}
+
+		Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
+		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+
+		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
+		Set<String> fileNamesCreated = new HashSet<>();
+		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+			fileNamesCreated.add(path.toString());
+		}
+
+		for(String file: uniqFilesFound) {
+			Assert.assertTrue(fileNamesCreated.contains(file));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	@Test
+	public void testFileSplitMonitoringProcessOnce() throws Exception {
+		Set<String> uniqFilesFound = new HashSet<>();
+
+		FileCreator fc = new FileCreator(INTERVAL, 1);
+		fc.start();
+
+		// to make sure that at least one file is created
+		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
+		synchronized (filesCreated) {
+			if (filesCreated.size() == 0) {
+				filesCreated.wait();
+			}
+		}
+		Assert.assertTrue(fc.getFilesCreated().size() >= 1);
+
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, hdfsURI, FilePathFilter.createDefaultFilter(),
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+
+		monitoringFunction.open(new Configuration());
+		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+
+		// wait until all the files are created
+		fc.join();
+
+		Assert.assertTrue(filesCreated.size() == NO_OF_FILES);
+
+		Set<String> fileNamesCreated = new HashSet<>();
+		for (org.apache.hadoop.fs.Path path: fc.getFilesCreated()) {
+			fileNamesCreated.add(path.toString());
+		}
+
+		Assert.assertTrue(uniqFilesFound.size() >= 1 && uniqFilesFound.size() < fileNamesCreated.size());
+		for(String file: uniqFilesFound) {
+			Assert.assertTrue(fileNamesCreated.contains(file));
+		}
+
+		for(org.apache.hadoop.fs.Path file: filesCreated) {
+			hdfs.delete(file, false);
+		}
+	}
+
+	// -------------		End of Tests
+
+	private int getLineNo(String line) {
+		String[] tkns = line.split("\\s");
+		Assert.assertTrue(tkns.length == 6);
+		return Integer.parseInt(tkns[tkns.length - 1]);
+	}
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator extends Thread {
+
+		private final long interval;
+		private final int noOfFilesBeforeNotifying;
+
+		private final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
+
+		FileCreator(long interval, int notificationLim) {
+			this.interval = interval;
+			this.noOfFilesBeforeNotifying = notificationLim;
+		}
+
+		public void run() {
+			try {
+				for(int i = 0; i < NO_OF_FILES; i++) {
+					Tuple2<org.apache.hadoop.fs.Path, String> file =
+						fillWithData(hdfsURI, "file", i, "This is test line.");
+
+					synchronized (filesCreated) {
+						filesCreated.add(file.f0);
+						if (filesCreated.size() == noOfFilesBeforeNotifying) {
+							filesCreated.notifyAll();
+						}
+					}
+					Thread.sleep(interval);
+				}
+			} catch (IOException | InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+
+		Set<org.apache.hadoop.fs.Path> getFilesCreated() {
+			return this.filesCreated;
+		}
+	}
+
+	private class TestingSourceContext implements SourceFunction.SourceContext<FileInputSplit> {
+
+		private final ContinuousFileMonitoringFunction src;
+		private final Set<String> filesFound;
+		private final Object lock = new Object();
+
+		TestingSourceContext(ContinuousFileMonitoringFunction monitoringFunction, Set<String> uniqFilesFound) {
+			this.filesFound = uniqFilesFound;
+			this.src = monitoringFunction;
+		}
+
+		@Override
+		public void collect(FileInputSplit element) {
+
+			String filePath = element.getPath().toString();
+			if (filesFound.contains(filePath)) {
+				// check if we have duplicate splits that are open during the first time
+				// the monitor sees them, and they then close, so the modification time changes.
+				Assert.fail("Duplicate file: " + filePath);
+			}
+
+			filesFound.add(filePath);
+			try {
+				if (filesFound.size() == NO_OF_FILES) {
+					this.src.cancel();
+					this.src.close();
+					synchronized (filesFound) {
+						filesFound.notifyAll();
+					}
+				}
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(FileInputSplit element, long timestamp) {
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/**
+	 * Fill the file with content.
+	 * */
+	private Tuple2<org.apache.hadoop.fs.Path, String> fillWithData(String base, String fileName, int fileIdx, String sampleLine) throws IOException {
+		assert (hdfs != null);
+
+		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
+		Assert.assertTrue (!hdfs.exists(file));
+
+		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+		FSDataOutputStream stream = hdfs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for(int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+
+		hdfs.rename(tmp, file);
+
+		Assert.assertTrue("No result file present", hdfs.exists(file));
+		return new Tuple2<>(file, str.toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index f44fe9e..ecf55c3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -20,7 +20,6 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.types.parser.StringParser;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.File;
@@ -58,23 +58,135 @@ public class CsvInputFormatTest {
 	private static final String SECOND_PART = "That is the second part";
 
 	@Test
-	public void ignoreInvalidLines() {
+	public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
+		testSplitCsvInputStream(1024 * 1024, false);
+	}
+
+	@Test
+	public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
+		testSplitCsvInputStream(2, false);
+	}
+
+	private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
+		final String fileContent =
+			"this is|1|2.0|\n"+
+			"a test|3|4.0|\n" +
+			"#next|5|6.0|\n" +
+			"asdadas|5|30.0|\n";
+
+		// create temporary file with 3 blocks
+		final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp");
+		tempFile.deleteOnExit();
+
+		FileOutputStream fileOutputStream = new FileOutputStream(tempFile);
+		fileOutputStream.write(fileContent.getBytes());
+		fileOutputStream.close();
+
+		// fix the number of blocks and the size of each one.
+		final int noOfBlocks = 3;
+
+		final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
+		CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
+		format.setLenient(true);
+		format.setBufferSize(bufferSize);
+
+		final Configuration config = new Configuration();
+		format.configure(config);
+
+		long[] offsetsAfterRecord = new long[]{ 15, 29, 42, 58};
+		long[] offsetAtEndOfSplit = new long[]{ 20, 40, 58};
+		int recordCounter = 0;
+		int splitCounter = 0;
+
+		FileInputSplit[] inputSplits = format.createInputSplits(noOfBlocks);
+		Tuple3<String, Integer, Double> result = new Tuple3<>();
+
+		for (FileInputSplit inputSplit : inputSplits) {
+			assertEquals(inputSplit.getStart() + inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+			splitCounter++;
+
+			format.open(inputSplit);
+			format.reopen(inputSplit, format.getCurrentState());
+
+			while (!format.reachedEnd()) {
+				if ((result = format.nextRecord(result)) != null) {
+					assertEquals((long) format.getCurrentState(), offsetsAfterRecord[recordCounter]);
+					recordCounter++;
+
+					if (recordCounter == 1) {
+						assertNotNull(result);
+						assertEquals("this is", result.f0);
+						assertEquals(new Integer(1), result.f1);
+						assertEquals(new Double(2.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 15);
+					} else if (recordCounter == 2) {
+						assertNotNull(result);
+						assertEquals("a test", result.f0);
+						assertEquals(new Integer(3), result.f1);
+						assertEquals(new Double(4.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 29);
+					} else if (recordCounter == 3) {
+						assertNotNull(result);
+						assertEquals("#next", result.f0);
+						assertEquals(new Integer(5), result.f1);
+						assertEquals(new Double(6.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 42);
+					} else {
+						assertNotNull(result);
+						assertEquals("asdadas", result.f0);
+						assertEquals(new Integer(5), result.f1);
+						assertEquals(new Double(30.0), result.f2);
+						assertEquals((long) format.getCurrentState(), 58);
+					}
+
+					// simulate checkpoint
+					Long state = format.getCurrentState();
+					long offsetToRestore = state;
+
+					// create a new format
+					format = new TupleCsvInputFormat<>(new Path(tempFile.toURI()), "\n", "|", typeInfo);
+					format.setLenient(true);
+					format.setBufferSize(bufferSize);
+					format.configure(config);
+
+					// simulate the restore operation.
+					format.reopen(inputSplit, offsetToRestore);
+				} else {
+					result = new Tuple3<>();
+				}
+			}
+			format.close();
+		}
+		Assert.assertEquals(4, recordCounter);
+	}
+
+	@Test
+	public void ignoreInvalidLinesAndGetOffsetInLargeBuffer() {
+		ignoreInvalidLines(1024 * 1024);
+	}
+
+	@Test
+	public void ignoreInvalidLinesAndGetOffsetInSmallBuffer() {
+		ignoreInvalidLines(2);
+	}
+
+	private void ignoreInvalidLines(int bufferSize) {
 		try {
-			
-			
 			final String fileContent =  "#description of the data\n" + 
 										"header1|header2|header3|\n"+
 										"this is|1|2.0|\n"+
 										"//a comment\n" +
 										"a test|3|4.0|\n" +
-										"#next|5|6.0|\n";
-			
+										"#next|5|6.0|\n" +
+										"asdasdas";
+
 			final FileInputSplit split = createTempFile(fileContent);
 
 			final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
 			final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
 			format.setLenient(true);
-		
+			format.setBufferSize(bufferSize);
+
 			final Configuration parameters = new Configuration();
 			format.configure(parameters);
 			format.open(split);
@@ -86,21 +198,25 @@ public class CsvInputFormatTest {
 			assertEquals("this is", result.f0);
 			assertEquals(new Integer(1), result.f1);
 			assertEquals(new Double(2.0), result.f2);
-			
+			assertEquals((long) format.getCurrentState(), 65);
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("a test", result.f0);
 			assertEquals(new Integer(3), result.f1);
 			assertEquals(new Double(4.0), result.f2);
-			
+			assertEquals((long) format.getCurrentState(), 91);
+
 			result = format.nextRecord(result);
 			assertNotNull(result);
 			assertEquals("#next", result.f0);
 			assertEquals(new Integer(5), result.f1);
 			assertEquals(new Double(6.0), result.f2);
+			assertEquals((long) format.getCurrentState(), 104);
 
 			result = format.nextRecord(result);
 			assertNull(result);
+			assertEquals(fileContent.length(), (long) format.getCurrentState());
 		}
 		catch (Exception ex) {
 			ex.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 80c5fbc..ee4d31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -101,8 +101,8 @@ public class RuntimeEnvironment implements Environment {
 			InputGate[] inputGates,
 			ActorGateway jobManager,
 			TaskManagerRuntimeInfo taskManagerInfo,
-			Task containingTask,
-			TaskMetricGroup metrics) {
+			TaskMetricGroup metrics,
+			Task containingTask) {
 
 		this.jobId = checkNotNull(jobId);
 		this.jobVertexId = checkNotNull(jobVertexId);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c1cbaa6..548d7d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -526,7 +526,7 @@ public class Task implements Runnable {
 					userCodeClassLoader, memoryManager, ioManager,
 					broadcastVariableManager, accumulatorRegistry,
 					splitProvider, distributedCacheEntries,
-					writers, inputGates, jobManager, taskManagerConfig, this, metrics);
+					writers, inputGates, jobManager, taskManagerConfig, metrics, this);
 
 			// let the task code create its readers and writers
 			invokable.setEnvironment(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/d353895b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
index 21ad762..e824758 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java
@@ -44,6 +44,11 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
 		}
 	}
 
+	public DataStreamSource(SingleOutputStreamOperator<T> operator) {
+		super(operator.environment, operator.getTransformation());
+		this.isParallel = true;
+	}
+
 	@Override
 	public DataStreamSource<T> setParallelism(int parallelism) {
 		if (parallelism > 1 && !isParallel) {