You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/10 13:21:17 UTC

[2/2] flink git commit: [FLINK-5027] FileSource finishes successfully with a wrong path

[FLINK-5027] FileSource finishes successfully with a wrong path

Integrated comments

This closes #2765.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6516938b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6516938b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6516938b

Branch: refs/heads/master
Commit: 6516938b93c08882587d32e8c39e7ebd9ed2fc7b
Parents: 029fda2
Author: kl0u <kk...@gmail.com>
Authored: Mon Nov 7 16:35:38 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 14:20:06 2016 +0100

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileProcessingTest.java | 27 ++++++++++++++++++++
 .../ContinuousFileMonitoringFunction.java       |  4 +++
 .../source/ContinuousFileReaderOperator.java    |  2 +-
 3 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 0283f68..5b14251 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -46,6 +46,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -110,6 +111,32 @@ public class ContinuousFileProcessingTest {
 	//						TESTS
 
 	@Test
+	public void testInvalidPathSpecification() throws Exception {
+
+		String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
+		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+		ContinuousFileMonitoringFunction<String> monitoringFunction =
+			new ContinuousFileMonitoringFunction<>(format, invalidPath,
+				FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+		try {
+			monitoringFunction.run(new DummySourceContext() {
+				@Override
+				public void collect(TimestampedFileInputSplit element) {
+					// we should never arrive here with an invalid path
+					Assert.fail("Test passes with an invalid path.");
+				}
+			});
+
+			// we should never arrive here with an invalid path
+			Assert.fail("Test passed with an invalid path.");
+
+		} catch (FileNotFoundException e) {
+			Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
+		}
+	}
+
+	@Test
 	public void testFileReadingOperatorWithIngestionTime() throws Exception {
 		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
 		Map<Integer, String> expectedFileContents = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index a6c5e49..10068a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -124,6 +125,9 @@ public class ContinuousFileMonitoringFunction<OUT>
 	@Override
 	public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
 		FileSystem fileSystem = FileSystem.get(new URI(path));
+		if (!fileSystem.exists(new Path(path))) {
+			throw new FileNotFoundException("The provided file path " + path + " does not exist.");
+		}
 
 		checkpointLock = context.getCheckpointLock();
 		switch (watchType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c8e9846..19e4737 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -311,7 +311,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
 
 			} finally {
 				synchronized (checkpointLock) {
-					LOG.info("Reader terminated, and exiting...");
+					LOG.debug("Reader terminated, and exiting...");
 
 					try {
 						this.format.closeInputFormat();