You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/06/07 17:22:59 UTC

[12/12] flink git commit: [FLINK-6830] [fileSource] Port continuous file reader migration tests for Flink 1.3

[FLINK-6830] [fileSource] Port continuous file reader migration tests for Flink 1.3

This commit also consolidates all Flink 1.1 and 1.2 migration tests into
a single ContinuousFileProcessingMigrationTest class. Parameterization
is used to test restore from different previous Flink versions.

This closes #4059.


Branch: refs/heads/release-1.3
Commit: d4a646a035366918a100f64428c471464870b8d0
Parents: e5a435b
Author: Tzu-Li (Gordon) Tai <>
Authored: Sun Jun 4 01:42:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <>
Committed: Wed Jun 7 19:11:43 2017 +0200

---------------------------------------------------------------------- | 402 ------------------ | 366 ----------------
 .../  | 423 +++++++++++++++++++
 ...gration-test-1496532000000-flink1.3-snapshot | Bin 0 -> 537 bytes
 .../reader-migration-test-flink1.3-snapshot     | Bin 0 -> 2823 bytes
 5 files changed, 423 insertions(+), 768 deletions(-)
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
deleted file mode 100644
index ec5e1ad..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
+++ /dev/null
@@ -1,402 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hdfstests;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-public class ContinuousFileProcessingFrom11MigrationTest {
-	private static final int NO_OF_FILES = 5;
-	private static final int LINES_PER_FILE = 10;
-	private static final long INTERVAL = 100;
-	private static File baseDir;
-	private static FileSystem hdfs;
-	private static String hdfsURI;
-	private static MiniDFSCluster hdfsCluster;
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-	@BeforeClass
-	public static void createHDFS() {
-		try {
-			baseDir = tempFolder.newFolder().getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			Configuration hdConf = new Configuration();
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster =;
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-			hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
-		} catch(Throwable e) {
-			e.printStackTrace();
-"Test failed " + e.getMessage());
-		}
-	}
-	@AfterClass
-	public static void destroyHDFS() {
-		try {
-			FileUtil.fullyDelete(baseDir);
-			hdfsCluster.shutdown();
-		} catch (Throwable t) {
-			throw new RuntimeException(t);
-		}
-	}
-	private static String getResourceFilename(String filename) {
-		ClassLoader cl = ContinuousFileProcessingFrom11MigrationTest.class.getClassLoader();
-		URL resource = cl.getResource(filename);
-		return resource.getFile();
-	}
-	//						TESTS
-	@Test
-	public void testReaderSnapshotRestore() throws Exception {
-		/*
-		FileInputSplit split1 =
-			new FileInputSplit(3, new Path("test/test1"), 0, 100, null);
-		FileInputSplit split2 =
-			new FileInputSplit(2, new Path("test/test2"), 101, 200, null);
-		FileInputSplit split3 =
-			new FileInputSplit(1, new Path("test/test2"), 0, 100, null);
-		FileInputSplit split4 =
-			new FileInputSplit(0, new Path("test/test3"), 0, 100, null);
-		final OneShotLatch latch = new OneShotLatch();
-		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
-		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-		ContinuousFileReaderOperator<FileInputSplit, ?> initReader = new ContinuousFileReaderOperator<>(format);
-		initReader.setOutputType(typeInfo, new ExecutionConfig());
-		OneInputStreamOperatorTestHarness<FileInputSplit, FileInputSplit> initTestInstance =
-			new OneInputStreamOperatorTestHarness<>(initReader);
-		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		// create some state in the reader
-		initTestInstance.processElement(new StreamRecord<>(split1));
-		initTestInstance.processElement(new StreamRecord<>(split2));
-		initTestInstance.processElement(new StreamRecord<>(split3));
-		initTestInstance.processElement(new StreamRecord<>(split4));
-		// take a snapshot of the operator's state. This will be used
-		// to initialize another reader and compare the results of the
-		// two operators.
-		final StreamTaskState snapshot;
-		synchronized (initTestInstance.getCheckpointLock()) {
-			snapshot = initTestInstance.snapshot(0L, 0L);
-		}
-		initTestInstance.snaphotToFile(snapshot, "src/test/resources/reader-migration-test-flink1.1-snapshot");
-		*/
-		TimestampedFileInputSplit split1 =
-			new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-		TimestampedFileInputSplit split2 =
-			new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-		TimestampedFileInputSplit split3 =
-			new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-		TimestampedFileInputSplit split4 =
-			new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-		final OneShotLatch latch = new OneShotLatch();
-		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
-		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
-		initReader.setOutputType(typeInfo, new ExecutionConfig());
-		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance =
-			new OneInputStreamOperatorTestHarness<>(initReader);
-		initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		initTestInstance.setup();
-		initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot"));
-		latch.trigger();
-		// ... and wait for the operators to close gracefully
-		synchronized (initTestInstance.getCheckpointLock()) {
-			initTestInstance.close();
-		}
-		FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1);
-		FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2);
-		FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3);
-		FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4);
-		// compare if the results contain what they should contain and also if
-		// they are the same, as they should.
-		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1)));
-		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2)));
-		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3)));
-		Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4)));
-	}
-	private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
-		Preconditions.checkNotNull(split);
-		return new FileInputSplit(
-			split.getSplitNumber(),
-			split.getPath(),
-			split.getStart(),
-			split.getLength(),
-			split.getHostnames()
-		);
-	}
-	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
-		private static final long serialVersionUID = -6727603565381560267L;
-		private final OneShotLatch latch;
-		private FileInputSplit split;
-		private boolean reachedEnd;
-		BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
-			super(filePath);
-			this.latch = latch;
-			this.reachedEnd = false;
-		}
-		@Override
-		public void open(FileInputSplit fileSplit) throws IOException {
-			this.split = fileSplit;
-			this.reachedEnd = false;
-		}
-		@Override
-		public boolean reachedEnd() throws IOException {
-			if (!latch.isTriggered()) {
-				try {
-					latch.await();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-			}
-			return reachedEnd;
-		}
-		@Override
-		public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
-			this.reachedEnd = true;
-			return split;
-		}
-		@Override
-		public void close() {
-		}
-	}
-	////				Monitoring Function Tests				//////
-	@Test
-	public void testFunctionRestore() throws Exception {
-		/*
-		org.apache.hadoop.fs.Path path = null;
-		long fileModTime = Long.MIN_VALUE;
-		for (int i = 0; i < 1; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
-			path = file.f0;
-			fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
-		}
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-		StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src =
-			new StreamSource<>(monitoringFunction);
-		final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
-			new OneInputStreamOperatorTestHarness<>(src);
-		final Throwable[] error = new Throwable[1];
-		final OneShotLatch latch = new OneShotLatch();
-		// run the source asynchronously
-		Thread runner = new Thread() {
-			@Override
-			public void run() {
-				try {
- DummySourceContext() {
-						@Override
-						public void collect(FileInputSplit element) {
-							latch.trigger();
-						}
-					});
-				}
-				catch (Throwable t) {
-					t.printStackTrace();
-					error[0] = t;
-				}
-			}
-		};
-		runner.start();
-		if (!latch.isTriggered()) {
-			latch.await();
-		}
-		StreamTaskState snapshot = testHarness.snapshot(0, 0);
-		testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot");
-		monitoringFunction.cancel();
-		runner.join();
-		testHarness.close();
-		*/
-		Long expectedModTime = Long.parseLong("1482144479339");
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
-			new StreamSource<>(monitoringFunction);
-		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-		testHarness.setup();
-		testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
-		Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
-	}
-	///////////				Source Contexts Used by the tests				/////////////////
-	private static abstract class DummySourceContext
-		implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
-		private final Object lock = new Object();
-		@Override
-		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
-		}
-		@Override
-		public void emitWatermark(Watermark mark) {
-		}
-		@Override
-		public Object getCheckpointLock() {
-			return lock;
-		}
-		@Override
-		public void close() {
-		}
-	}
-	/////////				Auxiliary Methods				/////////////
-	/**
-	 * Create a file with pre-determined String format of the form:
-	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
-	 * */
-	private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
-		String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-		assert (hdfs != null);
-		org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
-		Assert.assertFalse(hdfs.exists(file));
-		org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
-		FSDataOutputStream stream = hdfs.create(tmp);
-		StringBuilder str = new StringBuilder();
-		for (int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-			str.append(line);
-			stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
-		}
-		stream.close();
-		hdfs.rename(tmp, file);
-		Assert.assertTrue("No result file present", hdfs.exists(file));
-		return new Tuple2<>(file, str.toString());
-	}
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
deleted file mode 100644
index bf09447..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
+++ /dev/null
@@ -1,366 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.hdfstests;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.util.Preconditions;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-public class ContinuousFileProcessingFrom12MigrationTest {
-	private static final int LINES_PER_FILE = 10;
-	private static final long INTERVAL = 100;
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-	/**
-	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
-	 */
-	@Ignore
-	@Test
-	public void writeReaderSnapshot() throws Exception {
-		File testFolder = tempFolder.newFolder();
-		TimestampedFileInputSplit split1 =
-				new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-		TimestampedFileInputSplit split2 =
-				new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-		TimestampedFileInputSplit split3 =
-				new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-		TimestampedFileInputSplit split4 =
-				new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-		// this always blocks to ensure that the reader doesn't to any actual processing so that
-		// we keep the state for the four splits
-		final OneShotLatch blockingLatch = new OneShotLatch();
-		BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
-		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(
-				format);
-		initReader.setOutputType(typeInfo, new ExecutionConfig());
-		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
-				new OneInputStreamOperatorTestHarness<>(initReader);
-		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		// create some state in the reader
-		testHarness.processElement(new StreamRecord<>(split1));
-		testHarness.processElement(new StreamRecord<>(split2));
-		testHarness.processElement(new StreamRecord<>(split3));
-		testHarness.processElement(new StreamRecord<>(split4));
-		// take a snapshot of the operator's state. This will be used
-		// to initialize another reader and compare the results of the
-		// two operators.
-		final OperatorStateHandles snapshot;
-		synchronized (testHarness.getCheckpointLock()) {
-			snapshot = testHarness.snapshot(0L, 0L);
-		}
-		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink1.2-snapshot");
-	}
-	@Test
-	public void testReaderRestore() throws Exception {
-		File testFolder = tempFolder.newFolder();
-		final OneShotLatch latch = new OneShotLatch();
-		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
-		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
-		initReader.setOutputType(typeInfo, new ExecutionConfig());
-		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
-			new OneInputStreamOperatorTestHarness<>(initReader);
-		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-		testHarness.setup();
-		OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle(
-				OperatorSnapshotUtil.getResourceFilename(
-						"reader-migration-test-flink1.2-snapshot"));
-		testHarness.initializeState(operatorStateHandles);
-		latch.trigger();
-		// ... and wait for the operators to close gracefully
-		synchronized (testHarness.getCheckpointLock()) {
-			testHarness.close();
-		}
-		TimestampedFileInputSplit split1 =
-				new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-		TimestampedFileInputSplit split2 =
-				new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-		TimestampedFileInputSplit split3 =
-				new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-		TimestampedFileInputSplit split4 =
-				new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-		// compare if the results contain what they should contain and also if
-		// they are the same, as they should.
-		Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1)));
-		Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2)));
-		Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3)));
-		Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4)));
-	}
-	/**
-	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
-	 */
-	@Ignore
-	@Test
-	public void writeMonitoringSourceSnapshot() throws Exception {
-		File testFolder = tempFolder.newFolder();
-		long fileModTime = Long.MIN_VALUE;
-		for (int i = 0; i < 1; i++) {
-			Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
-			fileModTime = file.f0.lastModified();
-		}
-		TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
-		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
-			new StreamSource<>(monitoringFunction);
-		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-				new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-		final Throwable[] error = new Throwable[1];
-		final OneShotLatch latch = new OneShotLatch();
-		// run the source asynchronously
-		Thread runner = new Thread() {
-			@Override
-			public void run() {
-				try {
- DummySourceContext() {
-						@Override
-						public void collect(TimestampedFileInputSplit element) {
-							latch.trigger();
-						}
-						@Override
-						public void markAsTemporarilyIdle() {
-						}
-					});
-				}
-				catch (Throwable t) {
-					t.printStackTrace();
-					error[0] = t;
-				}
-			}
-		};
-		runner.start();
-		if (!latch.isTriggered()) {
-			latch.await();
-		}
-		final OperatorStateHandles snapshot;
-		synchronized (testHarness.getCheckpointLock()) {
-			snapshot = testHarness.snapshot(0L, 0L);
-		}
-		OperatorSnapshotUtil.writeStateHandle(
-				snapshot,
-				"src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.2-snapshot");
-		monitoringFunction.cancel();
-		runner.join();
-		testHarness.close();
-	}
-	@Test
-	public void testMonitoringSourceRestore() throws Exception {
-		File testFolder = tempFolder.newFolder();
-		Long expectedModTime = Long.parseLong("1493116191000");
-		TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
-		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
-			new StreamSource<>(monitoringFunction);
-		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
-			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-		testHarness.setup();
-		OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle(
-				OperatorSnapshotUtil.getResourceFilename(
-						"monitoring-function-migration-test-1493116191000-flink1.2-snapshot"));
-		testHarness.initializeState(operatorStateHandles);
-		Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
-	}
-	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
-		private static final long serialVersionUID = -6727603565381560267L;
-		private final OneShotLatch latch;
-		private FileInputSplit split;
-		private boolean reachedEnd;
-		BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
-			super(filePath);
-			this.latch = latch;
-			this.reachedEnd = false;
-		}
-		@Override
-		public void open(FileInputSplit fileSplit) throws IOException {
-			this.split = fileSplit;
-			this.reachedEnd = false;
-		}
-		@Override
-		public boolean reachedEnd() throws IOException {
-			if (!latch.isTriggered()) {
-				try {
-					latch.await();
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-			}
-			return reachedEnd;
-		}
-		@Override
-		public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
-			this.reachedEnd = true;
-			return split;
-		}
-		@Override
-		public void close() {
-		}
-	}
-	private static abstract class DummySourceContext
-		implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
-		private final Object lock = new Object();
-		@Override
-		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
-		}
-		@Override
-		public void emitWatermark(Watermark mark) {
-		}
-		@Override
-		public Object getCheckpointLock() {
-			return lock;
-		}
-		@Override
-		public void close() {
-		}
-	}
-	/**
-	 * Create a file with pre-determined String format of the form:
-	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
-	 * */
-	private Tuple2<File, String> createFileAndFillWithData(
-		File base, String fileName, int fileIdx, String sampleLine) throws IOException {
-		File file = new File(base, fileName + fileIdx);
-		Assert.assertFalse(file.exists());
-		File tmp = new File(base, "." + fileName + fileIdx);
-		FileOutputStream stream = new FileOutputStream(tmp);
-		StringBuilder str = new StringBuilder();
-		for (int i = 0; i < LINES_PER_FILE; i++) {
-			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
-			str.append(line);
-			stream.write(line.getBytes());
-		}
-		stream.close();
-		FileUtils.moveFile(tmp, file);
-		Assert.assertTrue("No result file present", file.exists());
-		return new Tuple2<>(file, str.toString());
-	}
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
new file mode 100644
index 0000000..89776eb
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/
@@ -0,0 +1,423 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.hdfstests;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.streaming.util.migration.MigrationTestUtil;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.Collection;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+ * Tests that verify the migration from previous Flink version snapshots.
+ */
+public class ContinuousFileProcessingMigrationTest {
+	private static final int LINES_PER_FILE = 10;
+	private static final long INTERVAL = 100;
+	@Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
+	public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
+		return Arrays.asList(
+			Tuple2.of(MigrationVersion.v1_1, 1482144479339L),
+			Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
+			Tuple2.of(MigrationVersion.v1_3, 1496532000000L));
+	}
+	/**
+	 * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3)
+	 * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints
+	 */
+	private final MigrationVersion flinkGenerateSavepointVersion = null;
+	private final MigrationVersion testMigrateVersion;
+	private final Long expectedModTime;
+	public ContinuousFileProcessingMigrationTest(Tuple2<MigrationVersion, Long> migrationVersionAndModTime) {
+		this.testMigrateVersion = migrationVersionAndModTime.f0;
+		this.expectedModTime = migrationVersionAndModTime.f1;
+	}
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+	/**
+	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
+	 */
+	@Ignore
+	@Test
+	public void writeReaderSnapshot() throws Exception {
+		File testFolder = tempFolder.newFolder();
+		TimestampedFileInputSplit split1 =
+				new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+		TimestampedFileInputSplit split2 =
+				new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+		TimestampedFileInputSplit split3 =
+				new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+		TimestampedFileInputSplit split4 =
+				new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+		// this always blocks to ensure that the reader doesn't to any actual processing so that
+		// we keep the state for the four splits
+		final OneShotLatch blockingLatch = new OneShotLatch();
+		BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(
+				format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
+				new OneInputStreamOperatorTestHarness<>(initReader);
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		// create some state in the reader
+		testHarness.processElement(new StreamRecord<>(split1));
+		testHarness.processElement(new StreamRecord<>(split2));
+		testHarness.processElement(new StreamRecord<>(split3));
+		testHarness.processElement(new StreamRecord<>(split4));
+		// take a snapshot of the operator's state. This will be used
+		// to initialize another reader and compare the results of the
+		// two operators.
+		final OperatorStateHandles snapshot;
+		synchronized (testHarness.getCheckpointLock()) {
+			snapshot = testHarness.snapshot(0L, 0L);
+		}
+		OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot");
+	}
+	@Test
+	public void testReaderRestore() throws Exception {
+		File testFolder = tempFolder.newFolder();
+		final OneShotLatch latch = new OneShotLatch();
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
+		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
+		initReader.setOutputType(typeInfo, new ExecutionConfig());
+		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
+			new OneInputStreamOperatorTestHarness<>(initReader);
+		testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		testHarness.setup();
+		MigrationTestUtil.restoreFromSnapshot(
+			testHarness,
+			OperatorSnapshotUtil.getResourceFilename(
+				"reader-migration-test-flink" + testMigrateVersion + "-snapshot"),
+			testMigrateVersion);
+		latch.trigger();
+		// ... and wait for the operators to close gracefully
+		synchronized (testHarness.getCheckpointLock()) {
+			testHarness.close();
+		}
+		TimestampedFileInputSplit split1 =
+				new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+		TimestampedFileInputSplit split2 =
+				new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+		TimestampedFileInputSplit split3 =
+				new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+		TimestampedFileInputSplit split4 =
+				new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+		// compare if the results contain what they should contain and also if
+		// they are the same, as they should.
+		if (testMigrateVersion == MigrationVersion.v1_1) {
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split1))));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split2))));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split3))));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split4))));
+		} else {
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1)));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2)));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3)));
+			Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4)));
+		}
+	}
+	/**
+	 * Manually run this to write binary snapshot data. Remove @Ignore to run.
+	 */
+	@Ignore
+	@Test
+	public void writeMonitoringSourceSnapshot() throws Exception {
+		File testFolder = tempFolder.newFolder();
+		long fileModTime = Long.MIN_VALUE;
+		for (int i = 0; i < 1; i++) {
+			Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
+			fileModTime = file.f0.lastModified();
+		}
+		TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+			new StreamSource<>(monitoringFunction);
+		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+				new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		final Throwable[] error = new Throwable[1];
+		final OneShotLatch latch = new OneShotLatch();
+		// run the source asynchronously
+		Thread runner = new Thread() {
+			@Override
+			public void run() {
+				try {
+ DummySourceContext() {
+						@Override
+						public void collect(TimestampedFileInputSplit element) {
+							latch.trigger();
+						}
+						@Override
+						public void markAsTemporarilyIdle() {
+						}
+					});
+				}
+				catch (Throwable t) {
+					t.printStackTrace();
+					error[0] = t;
+				}
+			}
+		};
+		runner.start();
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+		final OperatorStateHandles snapshot;
+		synchronized (testHarness.getCheckpointLock()) {
+			snapshot = testHarness.snapshot(0L, 0L);
+		}
+		OperatorSnapshotUtil.writeStateHandle(
+				snapshot,
+				"src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink" + flinkGenerateSavepointVersion + "-snapshot");
+		monitoringFunction.cancel();
+		runner.join();
+		testHarness.close();
+	}
+	@Test
+	public void testMonitoringSourceRestore() throws Exception {
+		File testFolder = tempFolder.newFolder();
+		TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
+		final ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+			new StreamSource<>(monitoringFunction);
+		final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+			new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+		testHarness.setup();
+		MigrationTestUtil.restoreFromSnapshot(
+			testHarness,
+			OperatorSnapshotUtil.getResourceFilename(
+				"monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
+			testMigrateVersion);
+		Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
+	}
+	private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
+		private static final long serialVersionUID = -6727603565381560267L;
+		private final OneShotLatch latch;
+		private FileInputSplit split;
+		private boolean reachedEnd;
+		BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+			super(filePath);
+			this.latch = latch;
+			this.reachedEnd = false;
+		}
+		@Override
+		public void open(FileInputSplit fileSplit) throws IOException {
+			this.split = fileSplit;
+			this.reachedEnd = false;
+		}
+		@Override
+		public boolean reachedEnd() throws IOException {
+			if (!latch.isTriggered()) {
+				try {
+					latch.await();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+			}
+			return reachedEnd;
+		}
+		@Override
+		public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
+			this.reachedEnd = true;
+			return split;
+		}
+		@Override
+		public void close() {
+		}
+	}
+	private static abstract class DummySourceContext
+		implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
+		private final Object lock = new Object();
+		@Override
+		public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
+		}
+		@Override
+		public void emitWatermark(Watermark mark) {
+		}
+		@Override
+		public Object getCheckpointLock() {
+			return lock;
+		}
+		@Override
+		public void close() {
+		}
+	}
+	/**
+	 * Create a file with pre-determined String format of the form:
+	 * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+	 * */
+	private Tuple2<File, String> createFileAndFillWithData(
+		File base, String fileName, int fileIdx, String sampleLine) throws IOException {
+		File file = new File(base, fileName + fileIdx);
+		Assert.assertFalse(file.exists());
+		File tmp = new File(base, "." + fileName + fileIdx);
+		FileOutputStream stream = new FileOutputStream(tmp);
+		StringBuilder str = new StringBuilder();
+		for (int i = 0; i < LINES_PER_FILE; i++) {
+			String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+			str.append(line);
+			stream.write(line.getBytes());
+		}
+		stream.close();
+		FileUtils.moveFile(tmp, file);
+		Assert.assertTrue("No result file present", file.exists());
+		return new Tuple2<>(file, str.toString());
+	}
+	private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
+		checkNotNull(split);
+		return new FileInputSplit(
+			split.getSplitNumber(),
+			split.getPath(),
+			split.getStart(),
+			split.getLength(),
+			split.getHostnames()
+		);
+	}
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
new file mode 100644
index 0000000..7ed677b
Binary files /dev/null and b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot differ
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot
new file mode 100644
index 0000000..bb612bd
Binary files /dev/null and b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot differ