You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/09/01 08:10:31 UTC

flink git commit: [hotfix] Fixes unstable ContinuousFileMonitoringTest.

Repository: flink
Updated Branches:
  refs/heads/master bdf9f86c5 -> 4a100fa4c


[hotfix] Fixes unstable ContinuousFileMonitoringTest.

This closes #2446


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a100fa4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a100fa4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a100fa4

Branch: refs/heads/master
Commit: 4a100fa4c9742aaee9593a4a21ce3882e81d3f68
Parents: bdf9f86
Author: kl0u <kk...@gmail.com>
Authored: Wed Aug 31 17:32:53 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 1 10:10:01 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java | 60 ++++++++++++--------
 1 file changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4a100fa4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 4aadaec..8a700f5 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -237,26 +237,36 @@ public class ContinuousFileMonitoringTest {
 
 	@Test
 	public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
-		Set<String> uniqFilesFound = new HashSet<>();
+		final Set<String> uniqFilesFound = new HashSet<>();
 
 		FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
 		fc.start();
 
-		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
-		format.setFilesFilter(FilePathFilter.createDefaultFilter());
-		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, hdfsURI,
-				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
-		monitoringFunction.open(new Configuration());
-		monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+		Thread t = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+				format.setFilesFilter(FilePathFilter.createDefaultFilter());
+				ContinuousFileMonitoringFunction<String> monitoringFunction =
+					new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+						FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+				try {
+					monitoringFunction.open(new Configuration());
+					monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+				} catch (Exception e) {
+					// do nothing as we interrupted the thread.
+				}
+			}
+		});
+		t.start();
 
 		// wait until the sink also sees all the splits.
 		synchronized (uniqFilesFound) {
-			while (uniqFilesFound.size() < NO_OF_FILES) {
-				uniqFilesFound.wait(7 * INTERVAL);
-			}
+			uniqFilesFound.wait();
 		}
+		t.interrupt();
+		fc.join();
 
 		Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
 		Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
@@ -281,13 +291,15 @@ public class ContinuousFileMonitoringTest {
 		Set<String> uniqFilesFound = new HashSet<>();
 
 		FileCreator fc = new FileCreator(INTERVAL, 1);
+		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
 		fc.start();
 
 		// to make sure that at least one file is created
-		Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
-		synchronized (filesCreated) {
-			if (filesCreated.size() == 0) {
-				filesCreated.wait();
+		if (filesCreated.size() == 0) {
+			synchronized (filesCreated) {
+				if (filesCreated.size() == 0) {
+					filesCreated.wait();
+				}
 			}
 		}
 		Assert.assertTrue(fc.getFilesCreated().size() >= 1);
@@ -391,17 +403,17 @@ public class ContinuousFileMonitoringTest {
 				Assert.fail("Duplicate file: " + filePath);
 			}
 
-			filesFound.add(filePath);
-			try {
-				if (filesFound.size() == NO_OF_FILES) {
-					this.src.cancel();
-					this.src.close();
-					synchronized (filesFound) {
+			synchronized (filesFound) {
+				filesFound.add(filePath);
+				try {
+					if (filesFound.size() == NO_OF_FILES) {
+						this.src.cancel();
+						this.src.close();
 						filesFound.notifyAll();
 					}
+				} catch (Exception e) {
+					e.printStackTrace();
 				}
-			} catch (Exception e) {
-				e.printStackTrace();
 			}
 		}