You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/15 18:40:22 UTC
[3/3] flink git commit: [FLINK-9758] Fix ContinuousFileProcessingTest
failure due to not setting runtimeContext
[FLINK-9758] Fix ContinuousFileProcessingTest failure due to not setting runtimeContext
This closes #6260
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7be2e18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7be2e18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7be2e18
Branch: refs/heads/master
Commit: a7be2e188b7eebe825a8608950b0c1addbfa536c
Parents: 10ddfca
Author: Yun Tang <my...@live.com>
Authored: Thu Jul 5 12:39:23 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Sun Jul 15 20:38:07 2018 +0200
----------------------------------------------------------------------
.../hdfstests/ContinuousFileProcessingTest.java | 31 ++++++++++++--------
1 file changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a7be2e18/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index d880f4f..cbea871f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.hdfstests;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -55,6 +56,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
import java.io.File;
import java.io.FileNotFoundException;
@@ -579,8 +581,7 @@ public class ContinuousFileProcessingTest {
});
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format,
- FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
@@ -632,8 +633,7 @@ public class ContinuousFileProcessingTest {
format.setNestedFileEnumeration(true);
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format,
- FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
final FileVerifyingSourceContext context =
new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
@@ -674,8 +674,7 @@ public class ContinuousFileProcessingTest {
FileInputSplit[] splits = format.createInputSplits(1);
ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format,
- FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
@@ -708,8 +707,7 @@ public class ContinuousFileProcessingTest {
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format,
- FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
@@ -772,7 +770,7 @@ public class ContinuousFileProcessingTest {
TextInputFormat format = new TextInputFormat(new Path(testBasePath));
final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
new StreamSource<>(monitoringFunction);
@@ -823,7 +821,7 @@ public class ContinuousFileProcessingTest {
testHarness.close();
final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
- new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
new StreamSource<>(monitoringFunctionCopy);
@@ -857,8 +855,7 @@ public class ContinuousFileProcessingTest {
format.setFilesFilter(FilePathFilter.createDefaultFilter());
final ContinuousFileMonitoringFunction<String> monitoringFunction =
- new ContinuousFileMonitoringFunction<>(format,
- FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+ createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
@@ -1055,4 +1052,14 @@ public class ContinuousFileProcessingTest {
Assert.assertTrue("No result file present", hdfs.exists(file));
return new Tuple2<>(file, str.toString());
}
+
+ /**
+ * Create continuous monitoring function with 1 reader-parallelism and interval: {@link #INTERVAL}.
+ */
+ private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) {
+ ContinuousFileMonitoringFunction<OUT> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL);
+ monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));
+ return monitoringFunction;
+ }
}