You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/15 18:40:22 UTC

[3/3] flink git commit: [FLINK-9758] Fix ContinuousFileProcessingTest failure due to not setting runtimeContext

[FLINK-9758] Fix ContinuousFileProcessingTest failure due to not setting runtimeContext

This closes #6260


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7be2e18
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7be2e18
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7be2e18

Branch: refs/heads/master
Commit: a7be2e188b7eebe825a8608950b0c1addbfa536c
Parents: 10ddfca
Author: Yun Tang <my...@live.com>
Authored: Thu Jul 5 12:39:23 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Sun Jul 15 20:38:07 2018 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileProcessingTest.java | 31 ++++++++++++--------
 1 file changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7be2e18/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index d880f4f..cbea871f 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.hdfstests;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -55,6 +56,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -579,8 +581,7 @@ public class ContinuousFileProcessingTest {
 		});
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
 
 		final FileVerifyingSourceContext context =
 			new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
@@ -632,8 +633,7 @@ public class ContinuousFileProcessingTest {
 		format.setNestedFileEnumeration(true);
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
 
 		final FileVerifyingSourceContext context =
 			new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction);
@@ -674,8 +674,7 @@ public class ContinuousFileProcessingTest {
 		FileInputSplit[] splits = format.createInputSplits(1);
 
 		ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
 
 		ModTimeVerifyingSourceContext context = new ModTimeVerifyingSourceContext(modTimes);
 
@@ -708,8 +707,7 @@ public class ContinuousFileProcessingTest {
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format,
-				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_ONCE);
 
 		final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction);
 
@@ -772,7 +770,7 @@ public class ContinuousFileProcessingTest {
 		TextInputFormat format = new TextInputFormat(new Path(testBasePath));
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
 
 		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> src =
 			new StreamSource<>(monitoringFunction);
@@ -823,7 +821,7 @@ public class ContinuousFileProcessingTest {
 		testHarness.close();
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunctionCopy =
-			new ContinuousFileMonitoringFunction<>(format, FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
 
 		StreamSource<TimestampedFileInputSplit, ContinuousFileMonitoringFunction<String>> srcCopy =
 			new StreamSource<>(monitoringFunctionCopy);
@@ -857,8 +855,7 @@ public class ContinuousFileProcessingTest {
 		format.setFilesFilter(FilePathFilter.createDefaultFilter());
 
 		final ContinuousFileMonitoringFunction<String> monitoringFunction =
-			new ContinuousFileMonitoringFunction<>(format,
-				FileProcessingMode.PROCESS_CONTINUOUSLY, 1, INTERVAL);
+			createTestContinuousFileMonitoringFunction(format, FileProcessingMode.PROCESS_CONTINUOUSLY);
 
 		final int totalNoOfFilesToBeRead = NO_OF_FILES + 1; // 1 for the bootstrap + NO_OF_FILES
 		final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch,
@@ -1055,4 +1052,14 @@ public class ContinuousFileProcessingTest {
 		Assert.assertTrue("No result file present", hdfs.exists(file));
 		return new Tuple2<>(file, str.toString());
 	}
+
+	/**
+	 * Create continuous monitoring function with 1 reader-parallelism and interval: {@link #INTERVAL}.
+	 */
+	private <OUT> ContinuousFileMonitoringFunction<OUT> createTestContinuousFileMonitoringFunction(FileInputFormat<OUT> format, FileProcessingMode fileProcessingMode) {
+		ContinuousFileMonitoringFunction<OUT> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, fileProcessingMode, 1, INTERVAL);
+		monitoringFunction.setRuntimeContext(Mockito.mock(RuntimeContext.class));
+		return monitoringFunction;
+	}
 }