You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/09/01 08:10:31 UTC
flink git commit: [hotfix] Fixes unstable
ContinuousFileMonitoringTest.
Repository: flink
Updated Branches:
refs/heads/master bdf9f86c5 -> 4a100fa4c
[hotfix] Fixes unstable ContinuousFileMonitoringTest.
This closes #2446
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a100fa4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a100fa4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a100fa4
Branch: refs/heads/master
Commit: 4a100fa4c9742aaee9593a4a21ce3882e81d3f68
Parents: bdf9f86
Author: kl0u <kk...@gmail.com>
Authored: Wed Aug 31 17:32:53 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Sep 1 10:10:01 2016 +0200
----------------------------------------------------------------------
.../hdfstests/ContinuousFileMonitoringTest.java | 60 ++++++++++++--------
1 file changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4a100fa4/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 4aadaec..8a700f5 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -237,26 +237,36 @@ public class ContinuousFileMonitoringTest {
@Test
public void testFileSplitMonitoringReprocessWithAppended() throws Exception {
- Set<String> uniqFilesFound = new HashSet<>();
+ final Set<String> uniqFilesFound = new HashSet<>();
FileCreator fc = new FileCreator(INTERVAL, NO_OF_FILES);
fc.start();
- TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
- format.setFilesFilter(FilePathFilter.createDefaultFilter());
- ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, hdfsURI,
- FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
-
- monitoringFunction.open(new Configuration());
- monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, hdfsURI,
+ FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+
+ try {
+ monitoringFunction.open(new Configuration());
+ monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound));
+ } catch (Exception e) {
+ // do nothing as we interrupted the thread.
+ }
+ }
+ });
+ t.start();
// wait until the sink also sees all the splits.
synchronized (uniqFilesFound) {
- while (uniqFilesFound.size() < NO_OF_FILES) {
- uniqFilesFound.wait(7 * INTERVAL);
- }
+ uniqFilesFound.wait();
}
+ t.interrupt();
+ fc.join();
Assert.assertTrue(fc.getFilesCreated().size() == NO_OF_FILES);
Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
@@ -281,13 +291,15 @@ public class ContinuousFileMonitoringTest {
Set<String> uniqFilesFound = new HashSet<>();
FileCreator fc = new FileCreator(INTERVAL, 1);
+ Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
fc.start();
// to make sure that at least one file is created
- Set<org.apache.hadoop.fs.Path> filesCreated = fc.getFilesCreated();
- synchronized (filesCreated) {
- if (filesCreated.size() == 0) {
- filesCreated.wait();
+ if (filesCreated.size() == 0) {
+ synchronized (filesCreated) {
+ if (filesCreated.size() == 0) {
+ filesCreated.wait();
+ }
}
}
Assert.assertTrue(fc.getFilesCreated().size() >= 1);
@@ -391,17 +403,17 @@ public class ContinuousFileMonitoringTest {
Assert.fail("Duplicate file: " + filePath);
}
- filesFound.add(filePath);
- try {
- if (filesFound.size() == NO_OF_FILES) {
- this.src.cancel();
- this.src.close();
- synchronized (filesFound) {
+ synchronized (filesFound) {
+ filesFound.add(filePath);
+ try {
+ if (filesFound.size() == NO_OF_FILES) {
+ this.src.cancel();
+ this.src.close();
filesFound.notifyAll();
}
+ } catch (Exception e) {
+ e.printStackTrace();
}
- } catch (Exception e) {
- e.printStackTrace();
}
}