You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/27 12:22:24 UTC

[2/3] flink git commit: [FLINK-4800] Introduce the TimestampedFileInputSplit for Continuous File Processing

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 2f0a16a..c8e9846 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -22,11 +22,9 @@ import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -43,44 +41,42 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
+import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * This is the operator that reads the {@link FileInputSplit FileInputSplits} received from
- * the preceding {@link ContinuousFileMonitoringFunction}. This operator can have parallelism
- * greater than 1, contrary to the {@link ContinuousFileMonitoringFunction} which has
- * a parallelism of 1.
+ * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding
+ * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction}
+ * which has a parallelism of 1, this operator can have DOP > 1.
  * <p/>
- * This operator will receive the split descriptors, put them in a queue, and have another
- * thread read the actual data from the split. This architecture allows the separation of the
- * reading thread, from the one emitting the checkpoint barriers, thus removing any potential
+ * As soon as a split descriptor is received, it is put in a queue, and have another
+ * thread read the actual data of the split. This architecture allows the separation of the
+ * reading thread from the one emitting the checkpoint barriers, thus removing any potential
  * back-pressure.
  */
 @Internal
-public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
+public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, StreamCheckpointedOperator {
 
 	private static final long serialVersionUID = 1L;
 
 	private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
-	/** A value that serves as a kill-pill to stop the reading thread when no more splits remain. */
-	private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1, -1, null);
-
 	private FileInputFormat<OUT> format;
 	private TypeSerializer<OUT> serializer;
 
 	private transient Object checkpointLock;
 
-	private transient SplitReader<S, OUT> reader;
+	private transient SplitReader<OUT> reader;
 	private transient SourceFunction.SourceContext<OUT> readerContext;
-	private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;
+	private List<TimestampedFileInputSplit> restoredReaderState;
 
 	public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
 		this.format = checkNotNull(format);
@@ -110,13 +106,13 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
 
 		// and initialize the split reading thread
-		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
-		this.readerState = null;
+		this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
+		this.restoredReaderState = null;
 		this.reader.start();
 	}
 
 	@Override
-	public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
+	public void processElement(StreamRecord<TimestampedFileInputSplit> element) throws Exception {
 		reader.addSplit(element.getValue());
 	}
 
@@ -157,7 +153,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		}
 		reader = null;
 		readerContext = null;
-		readerState = null;
+		restoredReaderState = null;
 		format = null;
 		serializer = null;
 	}
@@ -190,7 +186,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		output.close();
 	}
 
-	private class SplitReader<S extends Serializable, OT> extends Thread {
+	private class SplitReader<OT> extends Thread {
 
 		private volatile boolean isRunning;
 
@@ -200,44 +196,39 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		private final Object checkpointLock;
 		private final SourceFunction.SourceContext<OT> readerContext;
 
-		private final Queue<FileInputSplit> pendingSplits;
-
-		private FileInputSplit currentSplit = null;
+		private final Queue<TimestampedFileInputSplit> pendingSplits;
 
-		private S restoredFormatState = null;
+		private TimestampedFileInputSplit currentSplit;
 
-		private volatile boolean isSplitOpen = false;
+		private volatile boolean isSplitOpen;
 
 		private SplitReader(FileInputFormat<OT> format,
 					TypeSerializer<OT> serializer,
 					SourceFunction.SourceContext<OT> readerContext,
 					Object checkpointLock,
-					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
+					List<TimestampedFileInputSplit> restoredState) {
 
 			this.format = checkNotNull(format, "Unspecified FileInputFormat.");
 			this.serializer = checkNotNull(serializer, "Unspecified Serializer.");
 			this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context.");
 			this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock.");
 
-			this.pendingSplits = new ArrayDeque<>();
 			this.isRunning = true;
 
-			// this is the case where a task recovers from a previous failed attempt
-			if (restoredState != null) {
-				List<FileInputSplit> pending = restoredState.f0;
-				FileInputSplit current = restoredState.f1;
-				S formatState = restoredState.f2;
-
-				for (FileInputSplit split : pending) {
-					pendingSplits.add(split);
+			this.pendingSplits = new PriorityQueue<>(10, new Comparator<TimestampedFileInputSplit>() {
+				@Override
+				public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) {
+					return o1.compareTo(o2);
 				}
+			});
 
-				this.currentSplit = current;
-				this.restoredFormatState = formatState;
+			// this is the case where a task recovers from a previous failed attempt
+			if (restoredState != null) {
+				this.pendingSplits.addAll(restoredState);
 			}
 		}
 
-		private void addSplit(FileInputSplit split) {
+		private void addSplit(TimestampedFileInputSplit split) {
 			checkNotNull(split, "Cannot insert a null value in the pending splits queue.");
 			synchronized (checkpointLock) {
 				this.pendingSplits.add(split);
@@ -259,43 +250,32 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 
 					synchronized (checkpointLock) {
 
-						if (this.currentSplit != null) {
-
-							if (currentSplit.equals(EOS)) {
-								isRunning = false;
-								break;
-							}
-
-							if (this.format instanceof CheckpointableInputFormat && restoredFormatState != null) {
-
-								@SuppressWarnings("unchecked")
-								CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
-										(CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
-								checkpointableFormat.reopen(currentSplit, restoredFormatState);
-							} else {
-								// this is the case of a non-checkpointable input format that will reprocess the last split.
-								LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
-								format.open(currentSplit);
-							}
-							// reset the restored state to null for the next iteration
-							this.restoredFormatState = null;
-						} else {
-
-							// get the next split to read.
+						if (currentSplit == null) {
 							currentSplit = this.pendingSplits.poll();
-
 							if (currentSplit == null) {
 								checkpointLock.wait(50);
 								continue;
 							}
+						}
 
-							if (currentSplit.equals(EOS)) {
-								isRunning = false;
-								break;
-							}
+						if (currentSplit.equals(EOS)) {
+							isRunning = false;
+							break;
+						}
+
+						if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) {
+							// recovering after a node failure with an input
+							// format that supports resetting the offset
+							((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).
+								reopen(currentSplit, currentSplit.getSplitState());
+						} else {
+							// we either have a new split, or we recovered from a node
+							// failure but the input format does not support resetting the offset.
 							this.format.open(currentSplit);
 						}
+
+						// reset the restored state to null for the next iteration
+						this.currentSplit.resetSplitState();
 						this.isSplitOpen = true;
 					}
 
@@ -348,34 +328,17 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 			}
 		}
 
-		private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
-			List<FileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
-			for (FileInputSplit split: this.pendingSplits) {
-				snapshot.add(split);
-			}
-
-			// remove the current split from the list if inside.
-			if (this.currentSplit != null && this.currentSplit.equals(pendingSplits.peek())) {
-				this.pendingSplits.remove();
-			}
-
-			if (this.currentSplit != null) {
-				if (this.format instanceof CheckpointableInputFormat) {
-					@SuppressWarnings("unchecked")
-					CheckpointableInputFormat<FileInputSplit, S> checkpointableFormat =
-							(CheckpointableInputFormat<FileInputSplit, S>) this.format;
-
-					S formatState = this.isSplitOpen ?
-							checkpointableFormat.getCurrentState() :
-							restoredFormatState;
-					return new Tuple3<>(snapshot, currentSplit, formatState);
-				} else {
-					LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
-					return new Tuple3<>(snapshot, currentSplit, null);
+		private List<TimestampedFileInputSplit> getReaderState() throws IOException {
+			List<TimestampedFileInputSplit> snapshot = new ArrayList<>(this.pendingSplits.size());
+			if (currentSplit != null ) {
+				if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
+					Serializable formatState = ((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>) this.format).getCurrentState();
+					this.currentSplit.setSplitState(formatState);
 				}
-			} else {
-				return new Tuple3<>(snapshot, null, null);
+				snapshot.add(this.currentSplit);
 			}
+			snapshot.addAll(this.pendingSplits);
+			return snapshot;
 		}
 
 		public void cancel() {
@@ -389,45 +352,27 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception {
 		final ObjectOutputStream oos = new ObjectOutputStream(os);
 
-		Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState();
-		List<FileInputSplit> pendingSplits = readerState.f0;
-		FileInputSplit currSplit = readerState.f1;
-		S formatState = readerState.f2;
-
-		// write the current split
-		oos.writeObject(currSplit);
-		oos.writeInt(pendingSplits.size());
-		for (FileInputSplit split : pendingSplits) {
+		List<TimestampedFileInputSplit> readerState = this.reader.getReaderState();
+		oos.writeInt(readerState.size());
+		for (TimestampedFileInputSplit split : readerState) {
 			oos.writeObject(split);
 		}
-
-		// write the state of the reading channel
-		oos.writeObject(formatState);
 		oos.flush();
 	}
 
 	@Override
 	public void restoreState(FSDataInputStream is) throws Exception {
-		final ObjectInputStream ois = new ObjectInputStream(is);
 
-		// read the split that was being read
-		FileInputSplit currSplit = (FileInputSplit) ois.readObject();
+		checkState(this.restoredReaderState == null,
+			"The reader state has already been initialized.");
+
+		final ObjectInputStream ois = new ObjectInputStream(is);
 
-		// read the pending splits list
-		List<FileInputSplit> pendingSplits = new ArrayList<>();
 		int noOfSplits = ois.readInt();
+		List<TimestampedFileInputSplit> pendingSplits = new ArrayList<>(noOfSplits);
 		for (int i = 0; i < noOfSplits; i++) {
-			FileInputSplit split = (FileInputSplit) ois.readObject();
-			pendingSplits.add(split);
+			pendingSplits.add((TimestampedFileInputSplit) ois.readObject());
 		}
-
-		// read the state of the format
-		@SuppressWarnings("unchecked")
-		S formatState = (S) ois.readObject();
-
-		// set the whole reader state for the open() to find.
-		checkState(this.readerState == null, "The reader state has already been initialized.");
-
-		this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState);
+		this.restoredReaderState = pendingSplits;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
index cdbeb2b..f8c4fba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/FileProcessingMode.java
@@ -20,12 +20,15 @@ package org.apache.flink.streaming.api.functions.source;
 import org.apache.flink.annotation.PublicEvolving;
 
 /**
- * Specifies when the computation of the {@link ContinuousFileMonitoringFunction}
- * will be triggered.
+ * The mode in which the {@link ContinuousFileMonitoringFunction} operates.
+ * This can be either {@link #PROCESS_ONCE} or {@link #PROCESS_CONTINUOUSLY}.
  */
 @PublicEvolving
 public enum FileProcessingMode {
 
-	PROCESS_ONCE,				// Processes the current content of a file/path only ONCE, and stops monitoring.
-	PROCESS_CONTINUOUSLY		// Reprocesses the whole file when new data is appended.
+	/** Processes the current contents of the path and exits. */
+	PROCESS_ONCE,
+
+	/** Periodically scans the path for new data. */
+	PROCESS_CONTINUOUSLY
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
new file mode 100644
index 0000000..323b3ab
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * An extended {@link FileInputSplit} that also includes information about:
+ * <ul>
+ *     <li>The modification time of the file this split belongs to.</li>
+ *     <li>When checkpointing, the state of the split at the moment of the checkpoint.</li>
+ * </ul>
+ * This class is used by the {@link ContinuousFileMonitoringFunction} and the
+ * {@link ContinuousFileReaderOperator} to perform continuous file processing.
+ * */
+public class TimestampedFileInputSplit extends FileInputSplit implements Comparable<TimestampedFileInputSplit>{
+
+	/** The modification time of the file this split belongs to. */
+	private final long modificationTime;
+
+	/**
+	 * The state of the split. This information is used when
+	 * restoring from a checkpoint and allows to resume reading the
+	 * underlying file from the point we left off.
+	 * */
+	private Serializable splitState;
+
+	/** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/
+	public static final TimestampedFileInputSplit EOS =
+		new TimestampedFileInputSplit(Long.MIN_VALUE, -1, null, -1, -1, null);
+
+	/**
+	 * Creates a {@link TimestampedFileInputSplit} based on the file modification time and
+	 * the rest of the information of the {@link FileInputSplit}, as returned by the
+	 * underlying filesystem.
+	 *
+	 * @param modificationTime the modification file of the file this split belongs to
+	 * @param num    the number of this input split
+	 * @param file   the file name
+	 * @param start  the position of the first byte in the file to process
+	 * @param length the number of bytes in the file to process (-1 is flag for "read whole file")
+	 * @param hosts  the list of hosts containing the block, possibly {@code null}
+	 */
+	public TimestampedFileInputSplit(long modificationTime, int num, Path file, long start, long length, String[] hosts) {
+		super(num, file, start, length, hosts);
+
+		Preconditions.checkArgument(modificationTime >= 0 || modificationTime == Long.MIN_VALUE,
+			"Invalid File Split Modification Time: "+ modificationTime +".");
+
+		this.modificationTime = modificationTime;
+	}
+
+	/**
+	 * Sets the state of the split. This information is used when
+	 * restoring from a checkpoint and allows to resume reading the
+	 * underlying file from the point we left off.
+	 * <p>
+	 * This is applicable to {@link org.apache.flink.api.common.io.FileInputFormat FileInputFormats}
+	 * that implement the {@link org.apache.flink.api.common.io.CheckpointableInputFormat
+	 * CheckpointableInputFormat} interface.
+	 * */
+	public void setSplitState(Serializable state) {
+		this.splitState = state;
+	}
+
+	/**
+	 * Sets the state of the split to {@code null}.
+	 */
+	public void resetSplitState() {
+		this.setSplitState(null);
+	}
+
+	/** @return the state of the split. */
+	public Serializable getSplitState() {
+		return this.splitState;
+	}
+
+	/** @return The modification time of the file this split belongs to. */
+	public long getModificationTime() {
+		return this.modificationTime;
+	}
+
+	@Override
+	public int compareTo(TimestampedFileInputSplit o) {
+		long modTimeComp = this.modificationTime - o.modificationTime;
+		if (modTimeComp != 0L) {
+			// we cannot just cast the modTimeComp to int
+			// because it may overflow
+			return modTimeComp > 0 ? 1 : -1;
+		}
+
+		int pathComp = this.getPath().compareTo(o.getPath());
+		return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		} else if (o != null && o instanceof TimestampedFileInputSplit && super.equals(o)) {
+			TimestampedFileInputSplit that = (TimestampedFileInputSplit) o;
+			return this.modificationTime == that.modificationTime;
+		}
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		int res = 37 * (int)(this.modificationTime ^ (this.modificationTime >>> 32));
+		return 37 * res + super.hashCode();
+	}
+
+	@Override
+	public String toString() {
+		return "[" + getSplitNumber() + "] " + getPath() +" mod@ "+
+			modificationTime + " : " + getStart() + " + " + getLength();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
index a265c0a..0e9b054 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java
@@ -57,18 +57,17 @@ import static org.junit.Assert.fail;
 
 public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
 
-	private static final int NO_OF_FILES = 9;
-	private static final int LINES_PER_FILE = 200;
+	private static final int NO_OF_FILES = 5;
+	private static final int LINES_PER_FILE = 150;
 	private static final int NO_OF_RETRIES = 3;
-	private static final int PARALLELISM = 4;
-	private static final long INTERVAL = 2000;
+	private static final long INTERVAL = 100;
 
 	private static File baseDir;
-	private static org.apache.hadoop.fs.FileSystem fs;
+	private static org.apache.hadoop.fs.FileSystem localFs;
 	private static String localFsURI;
 	private FileCreator fc;
 
-	private static  Map<Integer, List<String>> finalCollectedContent = new HashMap<>();
+	private static  Map<Integer, Set<String>> actualCollectedContent = new HashMap<>();
 
 	@BeforeClass
 	public static void createHDFS() {
@@ -79,7 +78,7 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
 
 			localFsURI = "file:///" + baseDir +"/";
-			fs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
+			localFs = new org.apache.hadoop.fs.Path(localFsURI).getFileSystem(hdConf);
 
 		} catch(Throwable e) {
 			e.printStackTrace();
@@ -100,22 +99,22 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 	public void testProgram(StreamExecutionEnvironment env) {
 
 		// set the restart strategy.
-		env.getConfig().setRestartStrategy(
-			RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
-		env.enableCheckpointing(20);
-		env.setParallelism(PARALLELISM);
+		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0));
+		env.enableCheckpointing(10);
 
 		// create and start the file creating thread.
 		fc = new FileCreator();
 		fc.start();
 
 		// create the monitoring source along with the necessary readers.
-		TestingSinkFunction sink = new TestingSinkFunction();
 		TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
+
 		DataStream<String> inputStream = env.readFile(format, localFsURI,
 			FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
 
+		TestingSinkFunction sink = new TestingSinkFunction();
+
 		inputStream.flatMap(new FlatMapFunction<String, String>() {
 			@Override
 			public void flatMap(String value, Collector<String> out) throws Exception {
@@ -126,12 +125,17 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 
 	@Override
 	public void postSubmit() throws Exception {
-		Map<Integer, List<String>> collected = finalCollectedContent;
+
+		// be sure that the file creating thread is done.
+		fc.join();
+
+		Map<Integer, Set<String>> collected = actualCollectedContent;
 		Assert.assertEquals(collected.size(), fc.getFileContent().size());
+
 		for (Integer fileIdx: fc.getFileContent().keySet()) {
 			Assert.assertTrue(collected.keySet().contains(fileIdx));
 
-			List<String> cntnt = collected.get(fileIdx);
+			List<String> cntnt = new ArrayList<>(collected.get(fileIdx));
 			Collections.sort(cntnt, new Comparator<String>() {
 				@Override
 				public int compare(String o1, String o2) {
@@ -147,105 +151,34 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 		}
 
 		collected.clear();
-		finalCollectedContent.clear();
+		actualCollectedContent.clear();
 		fc.clean();
 	}
 
 	private int getLineNo(String line) {
 		String[] tkns = line.split("\\s");
-		Assert.assertTrue(tkns.length == 6);
 		return Integer.parseInt(tkns[tkns.length - 1]);
 	}
 
-	// --------------------------------------------------------------------------------------------
-	//  Custom Functions
-	// --------------------------------------------------------------------------------------------
-
-	// -------------------------			FILE CREATION			-------------------------------
-
-	/**
-	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
-	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
-	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
-	 * */
-	private class FileCreator extends Thread {
-
-		private final Set<Path> filesCreated = new HashSet<>();
-		private final Map<Integer, String> fileContents = new HashMap<>();
-
-		public void run() {
-			try {
-				for(int i = 0; i < NO_OF_FILES; i++) {
-					Tuple2<org.apache.hadoop.fs.Path, String> file =
-						fillWithData(localFsURI, "file", i, "This is test line.");
-					filesCreated.add(file.f0);
-					fileContents.put(i, file.f1);
-
-					Thread.sleep((int) (INTERVAL / (3.0/2)));
-				}
-			} catch (IOException | InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-
-		void clean() throws IOException {
-			assert (fs != null);
-			for (org.apache.hadoop.fs.Path path: filesCreated) {
-				fs.delete(path, false);
-			}
-			fileContents.clear();
-		}
-
-		Map<Integer, String> getFileContent() {
-			return this.fileContents;
-		}
-	}
-
-	/**
-	 * Fill the file with content and put the content in the {@code hdPathContents} list.
-	 * */
-	private Tuple2<Path, String> fillWithData(
-		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-
-		assert (fs != null);
-
-		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-
-		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-		FSDataOutputStream stream = fs.create(tmp);
-		StringBuilder str = new StringBuilder();
-		for(int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-			str.append(line);
-			stream.write(line.getBytes());
-		}
-		stream.close();
-
-		Assert.assertTrue("Result file present", !fs.exists(file));
-		fs.rename(tmp, file);
-		Assert.assertTrue("No result file present", fs.exists(file));
-		return new Tuple2<>(file, str.toString());
-	}
-
 	// --------------------------			Task Sink			------------------------------
 
 	private static class TestingSinkFunction extends RichSinkFunction<String>
 		implements Checkpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener {
 
-		private static volatile boolean hasFailed = false;
+		private boolean hasFailed;
 
-		private volatile int numSuccessfulCheckpoints;
-
-		private long count;
+		private volatile boolean hasSuccessfulCheckpoints;
 
 		private long elementsToFailure;
 
-		private long elementCounter = 0;
+		private long elementCounter;
 
-		private  Map<Integer, Set<String>> collectedContent = new HashMap<>();
+		private Map<Integer, Set<String>> actualContent = new HashMap<>();
 
 		TestingSinkFunction() {
 			hasFailed = false;
+			elementCounter = 0;
+			hasSuccessfulCheckpoints = false;
 		}
 
 		@Override
@@ -257,74 +190,157 @@ public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleran
 			long failurePosMax = (long) (0.7 * LINES_PER_FILE);
 
 			elementsToFailure = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-
-			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-				finalCollectedContent = new HashMap<>();
-				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
-					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-				}
-				throw new SuccessException();
-			}
-		}
-
-		@Override
-		public void close() {
-			try {
-				super.close();
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
 		}
 
 		@Override
 		public void invoke(String value) throws Exception {
-			int fileIdx = Character.getNumericValue(value.charAt(0));
+			int fileIdx = getFileIdx(value);
 
-			Set<String> content = collectedContent.get(fileIdx);
+			Set<String> content = actualContent.get(fileIdx);
 			if (content == null) {
 				content = new HashSet<>();
-				collectedContent.put(fileIdx, content);
+				actualContent.put(fileIdx, content);
 			}
 
+			// detect duplicate lines.
 			if (!content.add(value + "\n")) {
 				fail("Duplicate line: " + value);
 				System.exit(0);
 			}
 
-
 			elementCounter++;
+
+			// this is termination
 			if (elementCounter >= NO_OF_FILES * LINES_PER_FILE) {
-				finalCollectedContent = new HashMap<>();
-				for (Map.Entry<Integer, Set<String>> result: collectedContent.entrySet()) {
-					finalCollectedContent.put(result.getKey(), new ArrayList<>(result.getValue()));
-				}
+				actualCollectedContent = actualContent;
 				throw new SuccessException();
 			}
 
-			count++;
-			if (!hasFailed) {
-				Thread.sleep(2);
-				if (numSuccessfulCheckpoints >= 1 && count >= elementsToFailure) {
-					hasFailed = true;
-					throw new Exception("Task Failure");
-				}
+			// add some latency so that we have at least one checkpoint in
+			if (!hasFailed && !hasSuccessfulCheckpoints) {
+				Thread.sleep(5);
+			}
+
+			// simulate a node failure
+			if (!hasFailed && hasSuccessfulCheckpoints && elementCounter >= elementsToFailure) {
+				throw new Exception("Task Failure @ elem: " + elementCounter + " / " + elementsToFailure);
+			}
+		}
+
+		@Override
+		public void close() {
+			try {
+				super.close();
+			} catch (Exception e) {
+				e.printStackTrace();
 			}
 		}
 
 		@Override
 		public Tuple2<Long, Map<Integer, Set<String>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			return new Tuple2<>(elementCounter, collectedContent);
+			return new Tuple2<>(elementCounter, actualContent);
 		}
 
 		@Override
 		public void restoreState(Tuple2<Long, Map<Integer, Set<String>>> state) throws Exception {
+			this.hasFailed = true;
 			this.elementCounter = state.f0;
-			this.collectedContent = state.f1;
+			this.actualContent = state.f1;
 		}
 
 		@Override
 		public void notifyCheckpointComplete(long checkpointId) throws Exception {
-			numSuccessfulCheckpoints++;
+			hasSuccessfulCheckpoints = true;
 		}
+
+		private int getFileIdx(String line) {
+			String[] tkns = line.split(":");
+			return Integer.parseInt(tkns[0]);
+		}
+	}
+
+	// -------------------------			FILE CREATION			-------------------------------
+
+	/**
+	 * A separate thread creating {@link #NO_OF_FILES} files, one file every {@link #INTERVAL} milliseconds.
+	 * It serves for testing the file monitoring functionality of the {@link ContinuousFileMonitoringFunction}.
+	 * The files are filled with data by the {@link #fillWithData(String, String, int, String)} method.
+	 * */
+	private class FileCreator extends Thread {
+
+		private final Set<Path> filesCreated = new HashSet<>();
+		private final Map<Integer, String> fileContents = new HashMap<>();
+
+		/** The modification time of the last created file. */
+		private long lastCreatedModTime = Long.MIN_VALUE;
+
+		public void run() {
+			try {
+				for(int i = 0; i < NO_OF_FILES; i++) {
+					Tuple2<org.apache.hadoop.fs.Path, String> tmpFile;
+					long modTime;
+					do {
+
+						// give it some time so that the files have
+						// different modification timestamps.
+						Thread.sleep(50);
+
+						tmpFile = fillWithData(localFsURI, "file", i, "This is test line.");
+
+						modTime = localFs.getFileStatus(tmpFile.f0).getModificationTime();
+						if (modTime <= lastCreatedModTime) {
+							// delete the last created file to recreate it with a different timestamp
+							localFs.delete(tmpFile.f0, false);
+						}
+					} while (modTime <= lastCreatedModTime);
+					lastCreatedModTime = modTime;
+
+					// rename the file
+					org.apache.hadoop.fs.Path file =
+						new org.apache.hadoop.fs.Path(localFsURI + "/file" + i);
+					localFs.rename(tmpFile.f0, file);
+					Assert.assertTrue(localFs.exists(file));
+
+					filesCreated.add(file);
+					fileContents.put(i, tmpFile.f1);
+				}
+			} catch (IOException | InterruptedException e) {
+				e.printStackTrace();
+			}
+		}
+
+		void clean() throws IOException {
+			assert (localFs != null);
+			for (org.apache.hadoop.fs.Path path: filesCreated) {
+				localFs.delete(path, false);
+			}
+			fileContents.clear();
+		}
+
+		Map<Integer, String> getFileContent() {
+			return this.fileContents;
+		}
+	}
+
+	/**
+	 * Fill the file with content and put the content in the {@code hdPathContents} list.
+	 * */
+	private Tuple2<Path, String> fillWithData(
+		String base, String fileName, int fileIdx, String sampleLine) throws IOException, InterruptedException {
+
+		assert (localFs != null);
+
+		org.apache.hadoop.fs.Path tmp =
+			new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
+
+		FSDataOutputStream stream = localFs.create(tmp);
+		StringBuilder str = new StringBuilder();
+		for(int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+		return new Tuple2<>(tmp, str.toString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b410c393/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
new file mode 100644
index 0000000..88bd822
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TimestampedFileInputSplitTest {
+
+	@Test
+	public void testSplitEquality() {
+
+		TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS;
+		TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS;
+
+		Assert.assertEquals(eos1, eos2);
+
+		TimestampedFileInputSplit richFirstSplit =
+			new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
+		Assert.assertNotEquals(eos1, richFirstSplit);
+
+		TimestampedFileInputSplit richSecondSplit =
+			new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null);
+		Assert.assertEquals(richFirstSplit, richSecondSplit);
+
+		TimestampedFileInputSplit richModSecondSplit =
+			new TimestampedFileInputSplit(11, 2, new Path("test"), 0, 100, null);
+		Assert.assertNotEquals(richSecondSplit, richModSecondSplit);
+
+		TimestampedFileInputSplit richThirdSplit =
+			new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null);
+		Assert.assertEquals(richThirdSplit.getModificationTime(), 10);
+		Assert.assertNotEquals(richFirstSplit, richThirdSplit);
+
+		TimestampedFileInputSplit richThirdSplitCopy =
+			new TimestampedFileInputSplit(10, 2, new Path("test/test1"), 0, 100, null);
+		Assert.assertEquals(richThirdSplitCopy, richThirdSplit);
+	}
+
+	@Test
+	public void testSplitComparison() {
+		TimestampedFileInputSplit richFirstSplit =
+			new TimestampedFileInputSplit(10, 3, new Path("test/test1"), 0, 100, null);
+
+		TimestampedFileInputSplit richSecondSplit =
+			new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 0, 100, null);
+
+		TimestampedFileInputSplit richThirdSplit =
+			new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+		TimestampedFileInputSplit richForthSplit =
+			new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+		// lexicographically on the path order
+		Assert.assertTrue(richFirstSplit.compareTo(richSecondSplit) < 0);
+		Assert.assertTrue(richFirstSplit.compareTo(richThirdSplit) < 0);
+
+		// same mod time, same file so smaller split number first
+		Assert.assertTrue(richThirdSplit.compareTo(richSecondSplit) < 0);
+
+		// smaller modification time first
+		Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0);
+	}
+
+	@Test
+	public void testIllegalArgument() {
+		try {
+			new TimestampedFileInputSplit(-10, 2, new Path("test"), 0, 100, null); // invalid modification time
+		} catch (Exception e) {
+			if (!(e instanceof IllegalArgumentException)) {
+				Assert.fail(e.getMessage());
+			}
+		}
+	}
+}