You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/10 13:21:17 UTC
[2/2] flink git commit: [FLINK-5027] FileSource finishes successfully
with a wrong path
[FLINK-5027] FileSource finishes successfully with a wrong path
Integrated comments
This closes #2765.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6516938b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6516938b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6516938b
Branch: refs/heads/master
Commit: 6516938b93c08882587d32e8c39e7ebd9ed2fc7b
Parents: 029fda2
Author: kl0u <kk...@gmail.com>
Authored: Mon Nov 7 16:35:38 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 14:20:06 2016 +0100
----------------------------------------------------------------------
.../hdfstests/ContinuousFileProcessingTest.java | 27 ++++++++++++++++++++
.../ContinuousFileMonitoringFunction.java | 4 +++
.../source/ContinuousFileReaderOperator.java | 2 +-
3 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 0283f68..5b14251 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -46,6 +46,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -110,6 +111,32 @@ public class ContinuousFileProcessingTest {
// TESTS
@Test
+ public void testInvalidPathSpecification() throws Exception {
+
+ String invalidPath = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/invalid/";
+ TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+
+ ContinuousFileMonitoringFunction<String> monitoringFunction =
+ new ContinuousFileMonitoringFunction<>(format, invalidPath,
+ FileProcessingMode.PROCESS_ONCE, 1, INTERVAL);
+ try {
+ monitoringFunction.run(new DummySourceContext() {
+ @Override
+ public void collect(TimestampedFileInputSplit element) {
+ // we should never arrive here with an invalid path
+ Assert.fail("Test passes with an invalid path.");
+ }
+ });
+
+ // we should never arrive here with an invalid path
+ Assert.fail("Test passed with an invalid path.");
+
+ } catch (FileNotFoundException e) {
+ Assert.assertEquals("The provided file path " + invalidPath + " does not exist.", e.getMessage());
+ }
+ }
+
+ @Test
public void testFileReadingOperatorWithIngestionTime() throws Exception {
Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
Map<Integer, String> expectedFileContents = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index a6c5e49..10068a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -124,6 +125,9 @@ public class ContinuousFileMonitoringFunction<OUT>
@Override
public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> context) throws Exception {
FileSystem fileSystem = FileSystem.get(new URI(path));
+ if (!fileSystem.exists(new Path(path))) {
+ throw new FileNotFoundException("The provided file path " + path + " does not exist.");
+ }
checkpointLock = context.getCheckpointLock();
switch (watchType) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6516938b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index c8e9846..19e4737 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -311,7 +311,7 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU
} finally {
synchronized (checkpointLock) {
- LOG.info("Reader terminated, and exiting...");
+ LOG.debug("Reader terminated, and exiting...");
try {
this.format.closeInputFormat();