You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/07 17:22:59 UTC
[12/12] flink git commit: [FLINK-6830] [fileSource] Port continuous
file reader migration tests for Flink 1.3
[FLINK-6830] [fileSource] Port continuous file reader migration tests for Flink 1.3
This commit also consolidates all Flink 1.1 and 1.2 migration tests into
a single ContinuousFileProcessingMigrationTest class. Parameterization
is used to test restore from different previous Flink versions.
This closes #4059.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4a646a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4a646a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4a646a0
Branch: refs/heads/release-1.3
Commit: d4a646a035366918a100f64428c471464870b8d0
Parents: e5a435b
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sun Jun 4 01:42:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed Jun 7 19:11:43 2017 +0200
----------------------------------------------------------------------
...inuousFileProcessingFrom11MigrationTest.java | 402 ------------------
...inuousFileProcessingFrom12MigrationTest.java | 366 ----------------
.../ContinuousFileProcessingMigrationTest.java | 423 +++++++++++++++++++
...gration-test-1496532000000-flink1.3-snapshot | Bin 0 -> 537 bytes
.../reader-migration-test-flink1.3-snapshot | Bin 0 -> 2823 bytes
5 files changed, 423 insertions(+), 768 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
deleted file mode 100644
index ec5e1ad..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom11MigrationTest.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-
-public class ContinuousFileProcessingFrom11MigrationTest {
-
- private static final int NO_OF_FILES = 5;
- private static final int LINES_PER_FILE = 10;
-
- private static final long INTERVAL = 100;
-
- private static File baseDir;
-
- private static FileSystem hdfs;
- private static String hdfsURI;
- private static MiniDFSCluster hdfsCluster;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- // PREPARING FOR THE TESTS
-
- @BeforeClass
- public static void createHDFS() {
- try {
- baseDir = tempFolder.newFolder().getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
-
- Configuration hdConf = new Configuration();
- hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
- hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
-
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
- hdfsCluster = builder.build();
-
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
- hdfs = new org.apache.hadoop.fs.Path(hdfsURI).getFileSystem(hdConf);
-
- } catch(Throwable e) {
- e.printStackTrace();
- Assert.fail("Test failed " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void destroyHDFS() {
- try {
- FileUtil.fullyDelete(baseDir);
- hdfsCluster.shutdown();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- }
-
- private static String getResourceFilename(String filename) {
- ClassLoader cl = ContinuousFileProcessingFrom11MigrationTest.class.getClassLoader();
- URL resource = cl.getResource(filename);
- return resource.getFile();
- }
-
- // END OF PREPARATIONS
-
- // TESTS
-
- @Test
- public void testReaderSnapshotRestore() throws Exception {
-
- /*
-
- FileInputSplit split1 =
- new FileInputSplit(3, new Path("test/test1"), 0, 100, null);
- FileInputSplit split2 =
- new FileInputSplit(2, new Path("test/test2"), 101, 200, null);
- FileInputSplit split3 =
- new FileInputSplit(1, new Path("test/test2"), 0, 100, null);
- FileInputSplit split4 =
- new FileInputSplit(0, new Path("test/test3"), 0, 100, null);
-
- final OneShotLatch latch = new OneShotLatch();
- BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
- TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
- ContinuousFileReaderOperator<FileInputSplit, ?> initReader = new ContinuousFileReaderOperator<>(format);
- initReader.setOutputType(typeInfo, new ExecutionConfig());
- OneInputStreamOperatorTestHarness<FileInputSplit, FileInputSplit> initTestInstance =
- new OneInputStreamOperatorTestHarness<>(initReader);
- initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
- initTestInstance.open();
- // create some state in the reader
- initTestInstance.processElement(new StreamRecord<>(split1));
- initTestInstance.processElement(new StreamRecord<>(split2));
- initTestInstance.processElement(new StreamRecord<>(split3));
- initTestInstance.processElement(new StreamRecord<>(split4));
- // take a snapshot of the operator's state. This will be used
- // to initialize another reader and compare the results of the
- // two operators.
- final StreamTaskState snapshot;
- synchronized (initTestInstance.getCheckpointLock()) {
- snapshot = initTestInstance.snapshot(0L, 0L);
- }
-
- initTestInstance.snaphotToFile(snapshot, "src/test/resources/reader-migration-test-flink1.1-snapshot");
-
- */
- TimestampedFileInputSplit split1 =
- new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-
- TimestampedFileInputSplit split2 =
- new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-
- TimestampedFileInputSplit split3 =
- new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-
- TimestampedFileInputSplit split4 =
- new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-
-
- final OneShotLatch latch = new OneShotLatch();
-
- BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
- TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-
- ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
- initReader.setOutputType(typeInfo, new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance =
- new OneInputStreamOperatorTestHarness<>(initReader);
- initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
- initTestInstance.setup();
- initTestInstance.initializeStateFromLegacyCheckpoint(getResourceFilename("reader-migration-test-flink1.1-snapshot"));
- initTestInstance.open();
-
- latch.trigger();
-
- // ... and wait for the operators to close gracefully
-
- synchronized (initTestInstance.getCheckpointLock()) {
- initTestInstance.close();
- }
-
- FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1);
- FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2);
- FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3);
- FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4);
-
- // compare if the results contain what they should contain and also if
- // they are the same, as they should.
-
- Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1)));
- Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2)));
- Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3)));
- Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4)));
- }
-
- private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
- Preconditions.checkNotNull(split);
-
- return new FileInputSplit(
- split.getSplitNumber(),
- split.getPath(),
- split.getStart(),
- split.getLength(),
- split.getHostnames()
- );
- }
-
- private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
-
- private static final long serialVersionUID = -6727603565381560267L;
-
- private final OneShotLatch latch;
-
- private FileInputSplit split;
-
- private boolean reachedEnd;
-
- BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
- super(filePath);
- this.latch = latch;
- this.reachedEnd = false;
- }
-
- @Override
- public void open(FileInputSplit fileSplit) throws IOException {
- this.split = fileSplit;
- this.reachedEnd = false;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- if (!latch.isTriggered()) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return reachedEnd;
- }
-
- @Override
- public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
- this.reachedEnd = true;
- return split;
- }
-
- @Override
- public void close() {
-
- }
- }
-
- //// Monitoring Function Tests //////
-
- @Test
- public void testFunctionRestore() throws Exception {
-
- /*
- org.apache.hadoop.fs.Path path = null;
- long fileModTime = Long.MIN_VALUE;
- for (int i = 0; i < 1; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file", i, "This is test line.");
- path = file.f0;
- fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
- }
-
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-
- final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, format.getFilePath().toString(), new PathFilter(), FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
- StreamSource<FileInputSplit, ContinuousFileMonitoringFunction<String>> src =
- new StreamSource<>(monitoringFunction);
-
- final OneInputStreamOperatorTestHarness<Void, FileInputSplit> testHarness =
- new OneInputStreamOperatorTestHarness<>(src);
- testHarness.open();
-
- final Throwable[] error = new Throwable[1];
-
- final OneShotLatch latch = new OneShotLatch();
-
- // run the source asynchronously
- Thread runner = new Thread() {
- @Override
- public void run() {
- try {
- monitoringFunction.run(new DummySourceContext() {
- @Override
- public void collect(FileInputSplit element) {
- latch.trigger();
- }
- });
- }
- catch (Throwable t) {
- t.printStackTrace();
- error[0] = t;
- }
- }
- };
- runner.start();
-
- if (!latch.isTriggered()) {
- latch.await();
- }
-
- StreamTaskState snapshot = testHarness.snapshot(0, 0);
- testHarness.snaphotToFile(snapshot, "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.1-snapshot");
- monitoringFunction.cancel();
- runner.join();
-
- testHarness.close();
- */
-
- Long expectedModTime = Long.parseLong("1482144479339");
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-
- final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
- StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
- new StreamSource<>(monitoringFunction);
-
- final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
- new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
- testHarness.setup();
- testHarness.initializeStateFromLegacyCheckpoint(getResourceFilename("monitoring-function-migration-test-1482144479339-flink1.1-snapshot"));
- testHarness.open();
-
- Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
-
- }
-
- /////////// Source Contexts Used by the tests /////////////////
-
- private static abstract class DummySourceContext
- implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
-
- private final Object lock = new Object();
-
- @Override
- public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- }
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {
- }
- }
-
- ///////// Auxiliary Methods /////////////
-
- /**
- * Create a file with pre-determined String format of the form:
- * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
- * */
- private Tuple2<org.apache.hadoop.fs.Path, String> createFileAndFillWithData(
- String base, String fileName, int fileIdx, String sampleLine) throws IOException {
-
- assert (hdfs != null);
-
- org.apache.hadoop.fs.Path file = new org.apache.hadoop.fs.Path(base + "/" + fileName + fileIdx);
- Assert.assertFalse(hdfs.exists(file));
-
- org.apache.hadoop.fs.Path tmp = new org.apache.hadoop.fs.Path(base + "/." + fileName + fileIdx);
- FSDataOutputStream stream = hdfs.create(tmp);
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < LINES_PER_FILE; i++) {
- String line = fileIdx +": "+ sampleLine + " " + i +"\n";
- str.append(line);
- stream.write(line.getBytes(ConfigConstants.DEFAULT_CHARSET));
- }
- stream.close();
-
- hdfs.rename(tmp, file);
-
- Assert.assertTrue("No result file present", hdfs.exists(file));
- return new Tuple2<>(file, str.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
deleted file mode 100644
index bf09447..0000000
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import java.io.FileOutputStream;
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.io.TextInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OperatorSnapshotUtil;
-import org.apache.flink.util.Preconditions;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-
-public class ContinuousFileProcessingFrom12MigrationTest {
-
- private static final int LINES_PER_FILE = 10;
-
- private static final long INTERVAL = 100;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- /**
- * Manually run this to write binary snapshot data. Remove @Ignore to run.
- */
- @Ignore
- @Test
- public void writeReaderSnapshot() throws Exception {
-
- File testFolder = tempFolder.newFolder();
-
- TimestampedFileInputSplit split1 =
- new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-
- TimestampedFileInputSplit split2 =
- new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-
- TimestampedFileInputSplit split3 =
- new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-
- TimestampedFileInputSplit split4 =
- new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-
- // this always blocks to ensure that the reader doesn't to any actual processing so that
- // we keep the state for the four splits
- final OneShotLatch blockingLatch = new OneShotLatch();
- BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
-
- TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
- ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(
- format);
- initReader.setOutputType(typeInfo, new ExecutionConfig());
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
- new OneInputStreamOperatorTestHarness<>(initReader);
- testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
- testHarness.open();
- // create some state in the reader
- testHarness.processElement(new StreamRecord<>(split1));
- testHarness.processElement(new StreamRecord<>(split2));
- testHarness.processElement(new StreamRecord<>(split3));
- testHarness.processElement(new StreamRecord<>(split4));
- // take a snapshot of the operator's state. This will be used
- // to initialize another reader and compare the results of the
- // two operators.
-
- final OperatorStateHandles snapshot;
- synchronized (testHarness.getCheckpointLock()) {
- snapshot = testHarness.snapshot(0L, 0L);
- }
-
- OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink1.2-snapshot");
- }
-
- @Test
- public void testReaderRestore() throws Exception {
- File testFolder = tempFolder.newFolder();
-
- final OneShotLatch latch = new OneShotLatch();
-
- BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
- TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
-
- ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
- initReader.setOutputType(typeInfo, new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
- new OneInputStreamOperatorTestHarness<>(initReader);
- testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
-
- testHarness.setup();
- OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename(
- "reader-migration-test-flink1.2-snapshot"));
- testHarness.initializeState(operatorStateHandles);
- testHarness.open();
-
- latch.trigger();
-
- // ... and wait for the operators to close gracefully
-
- synchronized (testHarness.getCheckpointLock()) {
- testHarness.close();
- }
-
- TimestampedFileInputSplit split1 =
- new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
-
- TimestampedFileInputSplit split2 =
- new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
-
- TimestampedFileInputSplit split3 =
- new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
-
- TimestampedFileInputSplit split4 =
- new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
-
- // compare if the results contain what they should contain and also if
- // they are the same, as they should.
-
- Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1)));
- Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2)));
- Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3)));
- Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4)));
- }
-
- /**
- * Manually run this to write binary snapshot data. Remove @Ignore to run.
- */
- @Ignore
- @Test
- public void writeMonitoringSourceSnapshot() throws Exception {
-
- File testFolder = tempFolder.newFolder();
-
- long fileModTime = Long.MIN_VALUE;
- for (int i = 0; i < 1; i++) {
- Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
- fileModTime = file.f0.lastModified();
- }
-
- TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
-
- final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
- StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
- new StreamSource<>(monitoringFunction);
-
- final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
- new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-
- testHarness.open();
-
- final Throwable[] error = new Throwable[1];
-
- final OneShotLatch latch = new OneShotLatch();
-
- // run the source asynchronously
- Thread runner = new Thread() {
- @Override
- public void run() {
- try {
- monitoringFunction.run(new DummySourceContext() {
- @Override
- public void collect(TimestampedFileInputSplit element) {
- latch.trigger();
- }
-
- @Override
- public void markAsTemporarilyIdle() {
-
- }
- });
- }
- catch (Throwable t) {
- t.printStackTrace();
- error[0] = t;
- }
- }
- };
- runner.start();
-
- if (!latch.isTriggered()) {
- latch.await();
- }
-
- final OperatorStateHandles snapshot;
- synchronized (testHarness.getCheckpointLock()) {
- snapshot = testHarness.snapshot(0L, 0L);
- }
-
- OperatorSnapshotUtil.writeStateHandle(
- snapshot,
- "src/test/resources/monitoring-function-migration-test-" + fileModTime +"-flink1.2-snapshot");
-
- monitoringFunction.cancel();
- runner.join();
-
- testHarness.close();
- }
-
- @Test
- public void testMonitoringSourceRestore() throws Exception {
-
- File testFolder = tempFolder.newFolder();
-
- Long expectedModTime = Long.parseLong("1493116191000");
- TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
-
- final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
- StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
- new StreamSource<>(monitoringFunction);
-
- final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
- new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
-
- testHarness.setup();
- OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle(
- OperatorSnapshotUtil.getResourceFilename(
- "monitoring-function-migration-test-1493116191000-flink1.2-snapshot"));
-
- testHarness.initializeState(operatorStateHandles);
- testHarness.open();
-
- Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
-
- }
-
- private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
-
- private static final long serialVersionUID = -6727603565381560267L;
-
- private final OneShotLatch latch;
-
- private FileInputSplit split;
-
- private boolean reachedEnd;
-
- BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
- super(filePath);
- this.latch = latch;
- this.reachedEnd = false;
- }
-
- @Override
- public void open(FileInputSplit fileSplit) throws IOException {
- this.split = fileSplit;
- this.reachedEnd = false;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- if (!latch.isTriggered()) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return reachedEnd;
- }
-
- @Override
- public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
- this.reachedEnd = true;
- return split;
- }
-
- @Override
- public void close() {
-
- }
- }
-
- private static abstract class DummySourceContext
- implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
-
- private final Object lock = new Object();
-
- @Override
- public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- }
-
- @Override
- public Object getCheckpointLock() {
- return lock;
- }
-
- @Override
- public void close() {
- }
- }
-
- /**
- * Create a file with pre-determined String format of the form:
- * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
- * */
- private Tuple2<File, String> createFileAndFillWithData(
- File base, String fileName, int fileIdx, String sampleLine) throws IOException {
-
- File file = new File(base, fileName + fileIdx);
- Assert.assertFalse(file.exists());
-
- File tmp = new File(base, "." + fileName + fileIdx);
- FileOutputStream stream = new FileOutputStream(tmp);
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < LINES_PER_FILE; i++) {
- String line = fileIdx +": "+ sampleLine + " " + i +"\n";
- str.append(line);
- stream.write(line.getBytes());
- }
- stream.close();
-
- FileUtils.moveFile(tmp, file);
-
- Assert.assertTrue("No result file present", file.exists());
- return new Tuple2<>(file, str.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
new file mode 100644
index 0000000..89776eb
--- /dev/null
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.hdfstests;
+
+import java.io.FileOutputStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+import org.apache.flink.streaming.util.migration.MigrationTestUtil;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Tests that verify the migration from previous Flink version snapshots.
+ */
+@RunWith(Parameterized.class)
+public class ContinuousFileProcessingMigrationTest {
+
+ private static final int LINES_PER_FILE = 10;
+
+ private static final long INTERVAL = 100;
+
+ @Parameterized.Parameters(name = "Migration Savepoint / Mod Time: {0}")
+ public static Collection<Tuple2<MigrationVersion, Long>> parameters () {
+ return Arrays.asList(
+ Tuple2.of(MigrationVersion.v1_1, 1482144479339L),
+ Tuple2.of(MigrationVersion.v1_2, 1493116191000L),
+ Tuple2.of(MigrationVersion.v1_3, 1496532000000L));
+ }
+
+ /**
+ * TODO change this to the corresponding savepoint version to be written (e.g. {@link MigrationVersion#v1_3} for 1.3)
+ * TODO and remove all @Ignore annotations on write*Snapshot() methods to generate savepoints
+ */
+ private final MigrationVersion flinkGenerateSavepointVersion = null;
+
+ private final MigrationVersion testMigrateVersion;
+ private final Long expectedModTime;
+
+ public ContinuousFileProcessingMigrationTest(Tuple2<MigrationVersion, Long> migrationVersionAndModTime) {
+ this.testMigrateVersion = migrationVersionAndModTime.f0;
+ this.expectedModTime = migrationVersionAndModTime.f1;
+ }
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ /**
+ * Manually run this to write binary snapshot data. Remove @Ignore to run.
+ */
+ @Ignore
+ @Test
+ public void writeReaderSnapshot() throws Exception {
+
+ File testFolder = tempFolder.newFolder();
+
+ TimestampedFileInputSplit split1 =
+ new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+
+ TimestampedFileInputSplit split2 =
+ new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+
+ TimestampedFileInputSplit split3 =
+ new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+ TimestampedFileInputSplit split4 =
+ new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+ // this always blocks to ensure that the reader doesn't to any actual processing so that
+ // we keep the state for the four splits
+ final OneShotLatch blockingLatch = new OneShotLatch();
+ BlockingFileInputFormat format = new BlockingFileInputFormat(blockingLatch, new Path(testFolder.getAbsolutePath()));
+
+ TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+ ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(
+ format);
+ initReader.setOutputType(typeInfo, new ExecutionConfig());
+ OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
+ new OneInputStreamOperatorTestHarness<>(initReader);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+ testHarness.open();
+ // create some state in the reader
+ testHarness.processElement(new StreamRecord<>(split1));
+ testHarness.processElement(new StreamRecord<>(split2));
+ testHarness.processElement(new StreamRecord<>(split3));
+ testHarness.processElement(new StreamRecord<>(split4));
+ // take a snapshot of the operator's state. This will be used
+ // to initialize another reader and compare the results of the
+ // two operators.
+
+ final OperatorStateHandles snapshot;
+ synchronized (testHarness.getCheckpointLock()) {
+ snapshot = testHarness.snapshot(0L, 0L);
+ }
+
+ OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink" + flinkGenerateSavepointVersion + "-snapshot");
+ }
+
+ @Test
+ public void testReaderRestore() throws Exception {
+ File testFolder = tempFolder.newFolder();
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath()));
+ TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
+
+ ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
+ initReader.setOutputType(typeInfo, new ExecutionConfig());
+
+ OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness =
+ new OneInputStreamOperatorTestHarness<>(initReader);
+ testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ testHarness.setup();
+
+ MigrationTestUtil.restoreFromSnapshot(
+ testHarness,
+ OperatorSnapshotUtil.getResourceFilename(
+ "reader-migration-test-flink" + testMigrateVersion + "-snapshot"),
+ testMigrateVersion);
+
+ testHarness.open();
+
+ latch.trigger();
+
+ // ... and wait for the operators to close gracefully
+
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.close();
+ }
+
+ TimestampedFileInputSplit split1 =
+ new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
+
+ TimestampedFileInputSplit split2 =
+ new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null);
+
+ TimestampedFileInputSplit split3 =
+ new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null);
+
+ TimestampedFileInputSplit split4 =
+ new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null);
+
+ // compare if the results contain what they should contain and also if
+ // they are the same, as they should.
+
+ if (testMigrateVersion == MigrationVersion.v1_1) {
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split1))));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split2))));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split3))));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(createSplitFromTimestampedSplit(split4))));
+ } else {
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split1)));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split2)));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split3)));
+ Assert.assertTrue(testHarness.getOutput().contains(new StreamRecord<>(split4)));
+ }
+ }
+
+ /**
+ * Manually run this to write binary snapshot data. Remove @Ignore to run.
+ */
+ @Ignore
+ @Test
+ public void writeMonitoringSourceSnapshot() throws Exception {
+
+ File testFolder = tempFolder.newFolder();
+
+ long fileModTime = Long.MIN_VALUE;
+ for (int i = 0; i < 1; i++) {
+ Tuple2<File, String> file = createFileAndFillWithData(testFolder, "file", i, "This is test line.");
+ fileModTime = file.f0.lastModified();
+ }
+
+ TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
+
+ final ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+ StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+ new StreamSource<>(monitoringFunction);
+
+ final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+
+ testHarness.open();
+
+ final Throwable[] error = new Throwable[1];
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ // run the source asynchronously
+ Thread runner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ monitoringFunction.run(new DummySourceContext() {
+ @Override
+ public void collect(TimestampedFileInputSplit element) {
+ latch.trigger();
+ }
+
+ @Override
+ public void markAsTemporarilyIdle() {
+
+ }
+ });
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ error[0] = t;
+ }
+ }
+ };
+ runner.start();
+
+ if (!latch.isTriggered()) {
+ latch.await();
+ }
+
+ final OperatorStateHandles snapshot;
+ synchronized (testHarness.getCheckpointLock()) {
+ snapshot = testHarness.snapshot(0L, 0L);
+ }
+
+ OperatorSnapshotUtil.writeStateHandle(
+ snapshot,
+ "src/test/resources/monitoring-function-migration-test-" + fileModTime + "-flink" + flinkGenerateSavepointVersion + "-snapshot");
+
+ monitoringFunction.cancel();
+ runner.join();
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testMonitoringSourceRestore() throws Exception {
+
+ File testFolder = tempFolder.newFolder();
+
+ TextInputFormat format = new TextInputFormat(new Path(testFolder.getAbsolutePath()));
+
+ final ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+ StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
+ new StreamSource<>(monitoringFunction);
+
+ final AbstractStreamOperatorTestHarness<TimestampedFileInputSplit> testHarness =
+ new AbstractStreamOperatorTestHarness<>(src, 1, 1, 0);
+
+ testHarness.setup();
+
+ MigrationTestUtil.restoreFromSnapshot(
+ testHarness,
+ OperatorSnapshotUtil.getResourceFilename(
+ "monitoring-function-migration-test-" + expectedModTime + "-flink" + testMigrateVersion + "-snapshot"),
+ testMigrateVersion);
+
+ testHarness.open();
+
+ Assert.assertEquals((long) expectedModTime, monitoringFunction.getGlobalModificationTime());
+
+ }
+
+ private static class BlockingFileInputFormat extends FileInputFormat<FileInputSplit> {
+
+ private static final long serialVersionUID = -6727603565381560267L;
+
+ private final OneShotLatch latch;
+
+ private FileInputSplit split;
+
+ private boolean reachedEnd;
+
+ BlockingFileInputFormat(OneShotLatch latch, Path filePath) {
+ super(filePath);
+ this.latch = latch;
+ this.reachedEnd = false;
+ }
+
+ @Override
+ public void open(FileInputSplit fileSplit) throws IOException {
+ this.split = fileSplit;
+ this.reachedEnd = false;
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ if (!latch.isTriggered()) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return reachedEnd;
+ }
+
+ @Override
+ public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException {
+ this.reachedEnd = true;
+ return split;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
+
+ private static abstract class DummySourceContext
+ implements SourceFunction.SourceContext<TimestampedFileInputSplit> {
+
+ private final Object lock = new Object();
+
+ @Override
+ public void collectWithTimestamp(TimestampedFileInputSplit element, long timestamp) {
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return lock;
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ /**
+ * Create a file with pre-determined String format of the form:
+ * {@code fileIdx +": "+ sampleLine +" "+ lineNo}.
+ * */
+ private Tuple2<File, String> createFileAndFillWithData(
+ File base, String fileName, int fileIdx, String sampleLine) throws IOException {
+
+ File file = new File(base, fileName + fileIdx);
+ Assert.assertFalse(file.exists());
+
+ File tmp = new File(base, "." + fileName + fileIdx);
+ FileOutputStream stream = new FileOutputStream(tmp);
+ StringBuilder str = new StringBuilder();
+ for (int i = 0; i < LINES_PER_FILE; i++) {
+ String line = fileIdx +": "+ sampleLine + " " + i +"\n";
+ str.append(line);
+ stream.write(line.getBytes());
+ }
+ stream.close();
+
+ FileUtils.moveFile(tmp, file);
+
+ Assert.assertTrue("No result file present", file.exists());
+ return new Tuple2<>(file, str.toString());
+ }
+
+ private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) {
+ checkNotNull(split);
+
+ return new FileInputSplit(
+ split.getSplitNumber(),
+ split.getPath(),
+ split.getStart(),
+ split.getLength(),
+ split.getHostnames()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot
new file mode 100644
index 0000000..7ed677b
Binary files /dev/null and b/flink-fs-tests/src/test/resources/monitoring-function-migration-test-1496532000000-flink1.3-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d4a646a0/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot
new file mode 100644
index 0000000..bb612bd
Binary files /dev/null and b/flink-fs-tests/src/test/resources/reader-migration-test-flink1.3-snapshot differ