You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/10/10 08:48:21 UTC

flink git commit: [FLINK-4777] catch IOException in ContinuousFileMonitoringFunction

Repository: flink
Updated Branches:
  refs/heads/release-1.1 bab59dfa7 -> 7267562bb


[FLINK-4777] catch IOException in ContinuousFileMonitoringFunction

FileSystem.listStatus(path) may throw an IOException when it lists files
and then retrieves their file status. This is quite common, e.g. editors
which create temporary files and move them. The
ContinuousFileMonitoringFunction can only apply a file path filter
afterwards.

The solution is to defer file checks until no exception is caught anymore.

This closes #2610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7267562b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7267562b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7267562b

Branch: refs/heads/release-1.1
Commit: 7267562bb8110e3f7300007e996ce96355d37c59
Parents: bab59df
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Oct 7 20:06:18 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Oct 10 10:46:41 2016 +0200

----------------------------------------------------------------------
 .../source/ContinuousFileMonitoringFunction.java     | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7267562b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 8ff4a2a..4b2fbe1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -246,12 +246,21 @@ public class ContinuousFileMonitoringFunction<OUT>
 	 * method to decide which parts of the file to be processed, and forward them downstream.
 	 */
 	private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException {
-		List<FileStatus> files = new ArrayList<>();
 
-		FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+		final FileStatus[] statuses;
+		try {
+			statuses = fileSystem.listStatus(new Path(path));
+		} catch (IOException e) {
+			// we may run into an IOException if files are moved while listing their status
+			// delay the check for eligible files in this case
+			return Collections.emptyList();
+		}
+
 		if (statuses == null) {
 			LOG.warn("Path does not exist: {}", path);
+			return Collections.emptyList();
 		} else {
+			List<FileStatus> files = new ArrayList<>();
 			// handle the new files
 			for (FileStatus status : statuses) {
 				Path filePath = status.getPath();
@@ -260,8 +269,8 @@ public class ContinuousFileMonitoringFunction<OUT>
 					files.add(status);
 				}
 			}
+			return files;
 		}
-		return files;
 	}
 
 	/**