You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/10 16:21:42 UTC

flink git commit: [FLINK-5415] Harden ContinuousFileProcessingTest

Repository: flink
Updated Branches:
  refs/heads/master e29dfb840 -> f6709b4a4


[FLINK-5415] Harden ContinuousFileProcessingTest

- Use TemporaryFolder @ClassRule instead of manually managing HDFS base
dir.
- Place files for each test in own sub-directory
- Harden completeness condition in testFunctionRestore()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6709b4a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6709b4a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6709b4a

Branch: refs/heads/master
Commit: f6709b4a48a843a0a1818fd59b98d32f82d6184f
Parents: e29dfb8
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Feb 10 15:14:34 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Feb 10 17:07:39 2017 +0100

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileProcessingTest.java | 100 +++++++++++--------
 1 file changed, 59 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f6709b4a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 79331df..cc5cb8e 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -44,12 +44,13 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -73,22 +74,20 @@ public class ContinuousFileProcessingTest {
 
 	private static final long INTERVAL = 100;
 
-	private static File baseDir;
-
 	private static FileSystem hdfs;
 	private static String hdfsURI;
 	private static MiniDFSCluster hdfsCluster;
 
-	//						PREPARING FOR THE TESTS
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@BeforeClass
 	public static void createHDFS() {
 		try {
-			baseDir = new File("./target/hdfs/hdfsTesting").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
+			File hdfsDir = tempFolder.newFolder();
 
 			org.apache.hadoop.conf.Configuration hdConf = new org.apache.hadoop.conf.Configuration();
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsDir.getAbsolutePath());
 			hdConf.set("dfs.block.size", String.valueOf(1048576)); // this is the minimum we can set.
 
 			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
@@ -106,17 +105,12 @@ public class ContinuousFileProcessingTest {
 	@AfterClass
 	public static void destroyHDFS() {
 		try {
-			FileUtil.fullyDelete(baseDir);
 			hdfsCluster.shutdown();
 		} catch (Throwable t) {
 			throw new RuntimeException(t);
 		}
 	}
 
-	//						END OF PREPARATIONS
-
-	//						TESTS
-
 	@Test
 	public void testInvalidPathSpecification() throws Exception {
 
@@ -145,17 +139,19 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testFileReadingOperatorWithIngestionTime() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();
 		Map<String, Long> modTimes = new HashMap<>();
 		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			filesCreated.add(file.f0);
 			modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
 			expectedFileContents.put(i, file.f1);
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		final long watermarkInterval = 10;
@@ -303,17 +299,19 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testFileReadingOperatorWithEventTime() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<String, Long> modTimes = new HashMap<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();
 		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime());
 			filesCreated.add(file.f0);
 			expectedFileContents.put(i, file.f1);
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		ContinuousFileReaderOperator<String> reader = new ContinuousFileReaderOperator<>(format);
@@ -396,6 +394,7 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testReaderSnapshotRestore() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
 
 		TimestampedFileInputSplit split1 =
 			new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null);
@@ -412,7 +411,7 @@ public class ContinuousFileProcessingTest {
 
 		final OneShotLatch latch = new OneShotLatch();
 
-		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI));
+		BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testBasePath));
 		TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format);
 
 		ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format);
@@ -439,7 +438,7 @@ public class ContinuousFileProcessingTest {
 		}
 
 		ContinuousFileReaderOperator<FileInputSplit> restoredReader = new ContinuousFileReaderOperator<>(
-			new BlockingFileInputFormat(latch, new Path(hdfsURI)));
+			new BlockingFileInputFormat(latch, new Path(testBasePath)));
 		restoredReader.setOutputType(typeInfo, new ExecutionConfig());
 
 		OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance  =
@@ -543,24 +542,26 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testFilePathFiltering() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Set<String> filesKept = new TreeSet<>();
 
 		// create the files to be discarded
 		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "**file", i, "This is test line.");
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "**file", i, "This is test line.");
 			filesCreated.add(file.f0);
 		}
 
 		// create the files to be kept
 		for (int i = 0; i < NO_OF_FILES; i++) {
 			Tuple2<org.apache.hadoop.fs.Path, String> file =
-				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+				createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			filesCreated.add(file.f0);
 			filesKept.add(file.f0.getName());
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		format.setFilesFilter(new FilePathFilter() {
 
 			private static final long serialVersionUID = 2611449927338589804L;
@@ -591,19 +592,21 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testNestedFilesProcessing() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		final Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		final Set<String> filesToBeRead = new TreeSet<>();
 
 		// create two nested directories
-		org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir");
-		org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(hdfsURI + "/" + "firstLevelDir" + "/" + "secondLevelDir");
+		org.apache.hadoop.fs.Path firstLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir");
+		org.apache.hadoop.fs.Path secondLevelDir = new org.apache.hadoop.fs.Path(testBasePath + "/" + "firstLevelDir" + "/" + "secondLevelDir");
 		Assert.assertFalse(hdfs.exists(firstLevelDir));
 		hdfs.mkdirs(firstLevelDir);
 		hdfs.mkdirs(secondLevelDir);
 
 		// create files in the base dir, the first level dir and the second level dir
 		for (int i = 0; i < NO_OF_FILES; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "firstLevelFile", i, "This is test line.");
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "firstLevelFile", i, "This is test line.");
 			filesCreated.add(file.f0);
 			filesToBeRead.add(file.f0.getName());
 		}
@@ -618,7 +621,7 @@ public class ContinuousFileProcessingTest {
 			filesToBeRead.add(file.f0.getName());
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 		format.setNestedFileEnumeration(true);
 
@@ -644,19 +647,21 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testSortingOnModTime() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		final long[] modTimes = new long[NO_OF_FILES];
 		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
 
 		for (int i = 0; i < NO_OF_FILES; i++) {
 			Tuple2<org.apache.hadoop.fs.Path, String> file =
-				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+				createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			Thread.sleep(400);
 
 			filesCreated[i] = file.f0;
 			modTimes[i] = hdfs.getFileStatus(file.f0).getModificationTime();
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		// this is just to verify that all splits have been forwarded later.
@@ -680,18 +685,20 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testProcessOnce() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		final OneShotLatch latch = new OneShotLatch();
 
 		// create a single file in the directory
 		Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
-			createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+			createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
 		Assert.assertTrue(hdfs.exists(bootstrap.f0));
 
 		// the source is supposed to read only this file.
 		final Set<String> filesToBeRead = new TreeSet<>();
 		filesToBeRead.add(bootstrap.f0.getName());
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -728,7 +735,7 @@ public class ContinuousFileProcessingTest {
 		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
 		for (int i = 0; i < NO_OF_FILES; i++) {
 			Tuple2<org.apache.hadoop.fs.Path, String> ignoredFile =
-				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+				createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			filesCreated[i] = ignoredFile.f0;
 		}
 
@@ -746,16 +753,18 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testFunctionRestore() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 
 		org.apache.hadoop.fs.Path path = null;
 		long fileModTime = Long.MIN_VALUE;
 		for (int i = 0; i < 1; i++) {
-			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+			Tuple2<org.apache.hadoop.fs.Path, String> file = createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			path = file.f0;
 			fileModTime = hdfs.getFileStatus(file.f0).getModificationTime();
 		}
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
 			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
@@ -771,17 +780,19 @@ public class ContinuousFileProcessingTest {
 
 		final OneShotLatch latch = new OneShotLatch();
 
+		final DummySourceContext sourceContext = new DummySourceContext() {
+			@Override
+			public void collect(TimestampedFileInputSplit element) {
+				latch.trigger();
+			}
+		};
+
 		// run the source asynchronously
 		Thread runner = new Thread() {
 			@Override
 			public void run() {
 				try {
-					monitoringFunction.run(new DummySourceContext() {
-						@Override
-						public void collect(TimestampedFileInputSplit element) {
-							latch.trigger();
-						}
-					});
+					monitoringFunction.run(sourceContext);
 				}
 				catch (Throwable t) {
 					t.printStackTrace();
@@ -791,10 +802,15 @@ public class ContinuousFileProcessingTest {
 		};
 		runner.start();
 
+		// first condition for the source to have updated its state: emit at least one element
 		if (!latch.isTriggered()) {
 			latch.await();
 		}
 
+		// second condition for the source to have updated its state: it's not on the lock anymore,
+		// this means it has processed all the splits and updated its state.
+		synchronized (sourceContext.getCheckpointLock()) {}
+
 		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
 		monitoringFunction.cancel();
 		runner.join();
@@ -820,17 +836,19 @@ public class ContinuousFileProcessingTest {
 
 	@Test
 	public void testProcessContinuously() throws Exception {
+		String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
+
 		final OneShotLatch latch = new OneShotLatch();
 
 		// create a single file in the directory
 		Tuple2<org.apache.hadoop.fs.Path, String> bootstrap =
-			createFileAndFillWithData(hdfsURI, "file", NO_OF_FILES + 1, "This is test line.");
+			createFileAndFillWithData(testBasePath, "file", NO_OF_FILES + 1, "This is test line.");
 		Assert.assertTrue(hdfs.exists(bootstrap.f0));
 
 		final Set<String> filesToBeRead = new TreeSet<>();
 		filesToBeRead.add(bootstrap.f0.getName());
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
@@ -863,7 +881,7 @@ public class ContinuousFileProcessingTest {
 		final org.apache.hadoop.fs.Path[] filesCreated = new org.apache.hadoop.fs.Path[NO_OF_FILES];
 		for (int i = 0; i < NO_OF_FILES; i++) {
 			Tuple2<org.apache.hadoop.fs.Path, String> file =
-				createFileAndFillWithData(hdfsURI, "file", i, "This is test line.");
+				createFileAndFillWithData(testBasePath, "file", i, "This is test line.");
 			filesCreated[i] = file.f0;
 			filesToBeRead.add(file.f0.getName());
 		}