You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/11 13:09:11 UTC

[2/3] flink git commit: [FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering.

[FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a90c6bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a90c6bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a90c6bd

Branch: refs/heads/master
Commit: 5a90c6bdd0a4c279fca7665532c4a34992ffbb24
Parents: 98a6176
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 3 11:08:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 11 14:05:58 2016 +0100

----------------------------------------------------------------------
 .../source/ContinuousFileMonitoringFunction.java         | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5a90c6bd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 10068a6..54ab0ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -147,8 +147,15 @@ public class ContinuousFileMonitoringFunction<OUT>
 				break;
 			case PROCESS_ONCE:
 				synchronized (checkpointLock) {
-					monitorDirAndForwardSplits(fileSystem, context);
-					globalModificationTime = Long.MAX_VALUE;
+
+					// the following check guarantees that if we restart
+					// after a failure and we managed to have a successful
+					// checkpoint, we will not reprocess the directory.
+
+					if (globalModificationTime == Long.MIN_VALUE) {
+						monitorDirAndForwardSplits(fileSystem, context);
+						globalModificationTime = Long.MAX_VALUE;
+					}
 					isRunning = false;
 				}
 				break;