You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/10 16:21:42 UTC
flink git commit: [FLINK-5415] Harden ContinuousFileProcessingTest
Repository: flink
Updated Branches:
refs/heads/master e29dfb840 -> f6709b4a4
[FLINK-5415] Harden ContinuousFileProcessingTest
- Use TemporaryFolder @ClassRule instead of manually managing HDFS base
dir.
- Place files for each test in own sub-directory
- Harden completeness condition in testFunctionRestore()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6709b4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6709b4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6709b4a
Branch: refs/heads/master
Commit: f6709b4a48a843a0a1818fd59b98d32f82d6184f
Parents: e29dfb8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Feb 10 15:14:34 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 10 17:07:39 2017 +0100
----------------------------------------------------------------------
.../hdfstests/ContinuousFileProcessingTest.java | 100 +++++++++++--------
1 file changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6709b4a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 79331df..cc5cb8e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -44,12 +44,13 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileNotFoundException;
@@ -73,22 +74,20 @@ public class ContinuousFileProcessingTest {
private static final long INTERVAL = 100;
- private static File baseDir;
-
private static FileSystem hdfs;
private static String hdfsURI;
private static MiniDFSCluster hdfsCluster;
- // PREPARING FOR THE TESTS
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
@BeforeClass
public static void createHDFS() {
try {
- baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
+ File hdfsDir = tempFolder.newFolder();
org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
- hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsDir.getAbsolutePath());
hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
@@ -106,17 +105,12 @@ public class ContinuousFileProcessingTest {
@AfterClass
public static void destroyHDFS() {
try {
- FileUtil.fullyDelete(baseDir);
hdfsCluster.shutdown();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
- // END OF PREPARATIONS
-
- // TESTS
-
@Test
public void testInvalidPathSpecification() throws Exception {
@@ -145,17 +139,19 @@ public class ContinuousFileProcessingTest {
@Test
public void testFileReadingOperatorWithIngestionTime() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
Map<String, Long> modTimes = new HashMap<>();
for (int i = 0; i < NO_OF_FILES; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated.add(file.f0);
modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
expectedFileContents.put(i, file.f1);
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
final long watermarkInterval = 10;
@@ -303,17 +299,19 @@ public class ContinuousFileProcessingTest {
@Test
public void testFileReadingOperatorWithEventTime() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<String, Long> modTimes = new HashMap<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
for (int i = 0; i < NO_OF_FILES; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
filesCreated.add(file.f0);
expectedFileContents.put(i, file.f1);
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
@@ -396,6 +394,7 @@ public class ContinuousFileProcessingTest {
@Test
public void testReaderSnapshotRestore() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
TimestampedFileInputSplit split1 =
new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
@@ -412,7 +411,7 @@ public class ContinuousFileProcessingTest {
final OneShotLatch latch = new OneShotLatch();
- BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+ BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testBasePath));
TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
@@ -439,7 +438,7 @@ public class ContinuousFileProcessingTest {
}
ContinuousFileReaderOperator<FileInputSplit> restoredReader = new ContinuousFileReaderOperator<>(
- new BlockingFileInputFormat(latch, new Path(hdfsURI)));
+ new BlockingFileInputFormat(latch, new Path(testBasePath)));
restoredReader.setOutputType(typeInfo, new ExecutionConfig());
OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance =
@@ -543,24 +542,26 @@ public class ContinuousFileProcessingTest {
@Test
public void testFilePathFiltering() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Set<String> filesKept = new TreeSet<>();
// create the files to be discarded
for (int i = 0; i < NO_OF_FILES; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "**file", i, "This is test line.");
filesCreated.add(file.f0);
}
// create the files to be kept
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
- createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated.add(file.f0);
filesKept.add(file.f0.getName());
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(new FilePathFilter() {
private static final long serialVersionUID = 2611449927338589804L;
@@ -591,19 +592,21 @@ public class ContinuousFileProcessingTest {
@Test
public void testNestedFilesProcessing() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
final Set<String> filesToBeRead = new TreeSet<>();
// create two nested directories
- org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
- org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
+ org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir");
+ org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir" + "/" + "secondLevelDir");
Assert.assertFalse(hdfs.exists(firstLevelDir));
hdfs.mkdirs(firstLevelDir);
hdfs.mkdirs(secondLevelDir);
// create files in the base dir, the first level dir and the second level dir
for (int i = 0; i < NO_OF_FILES; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "firstLevelFile", i, "This is test line.");
filesCreated.add(file.f0);
filesToBeRead.add(file.f0.getName());
}
@@ -618,7 +621,7 @@ public class ContinuousFileProcessingTest {
filesToBeRead.add(file.f0.getName());
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
format.setNestedFileEnumeration(true);
@@ -644,19 +647,21 @@ public class ContinuousFileProcessingTest {
@Test
public void testSortingOnModTime() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
final long[] modTimes = new long[NO_OF_FILES];
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
- createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
Thread.sleep(400);
filesCreated[i] = file.f0;
modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
// this is just to verify that all splits have been forwarded later.
@@ -680,18 +685,20 @@ public class ContinuousFileProcessingTest {
@Test
public void testProcessOnce() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
final OneShotLatch latch = new OneShotLatch();
// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
- createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));
// the source is supposed to read only this file.
final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -728,7 +735,7 @@ public class ContinuousFileProcessingTest {
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
- createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = ignoredFile.f0;
}
@@ -746,16 +753,18 @@ public class ContinuousFileProcessingTest {
@Test
public void testFunctionRestore() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
org.apache.hadoop.fs.Path path = null;
long fileModTime = Long.MIN_VALUE;
for (int i = 0; i < 1; i++) {
- Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
path = file.f0;
fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
}
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
@@ -771,17 +780,19 @@ public class ContinuousFileProcessingTest {
final OneShotLatch latch = new OneShotLatch();
+ final DummySourceContext sourceContext = new DummySourceContext() {
+ @Override
+ public void collect(TimestampedFileInputSplit element) {
+ latch.trigger();
+ }
+ };
+
// run the source asynchronously
Thread runner = new Thread() {
@Override
public void run() {
try {
- monitoringFunction.run(new DummySourceContext() {
- @Override
- public void collect(TimestampedFileInputSplit element) {
- latch.trigger();
- }
- });
+ monitoringFunction.run(sourceContext);
}
catch (Throwable t) {
t.printStackTrace();
@@ -791,10 +802,15 @@ public class ContinuousFileProcessingTest {
};
runner.start();
+ // first condition for the source to have updated its state: emit at least one element
if (!latch.isTriggered()) {
latch.await();
}
+ // second condition for the source to have updated its state: it's not on the lock anymore,
+ // this means it has processed all the splits and updated its state.
+ synchronized (sourceContext.getCheckpointLock()) {}
+
OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
monitoringFunction.cancel();
runner.join();
@@ -820,17 +836,19 @@ public class ContinuousFileProcessingTest {
@Test
public void testProcessContinuously() throws Exception {
+ String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
final OneShotLatch latch = new OneShotLatch();
// create a single file in the directory
Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
- createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
Assert.assertTrue(hdfs.exists(bootstrap.f0));
final Set<String> filesToBeRead = new TreeSet<>();
filesToBeRead.add(bootstrap.f0.getName());
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ TextInputFormat format = new TextInputFormat(new Path(testBasePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -863,7 +881,7 @@ public class ContinuousFileProcessingTest {
final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
for (int i = 0; i < NO_OF_FILES; i++) {
Tuple2<org.apache.hadoop.fs.Path, String> file =
- createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+ createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
filesCreated[i] = file.f0;
filesToBeRead.add(file.f0.getName());
}