You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/11 13:09:11 UTC
[2/3] flink git commit: [FLINK-5021] Guarantee PROCESS_ONCE works
correctly after recovering.
[FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5a90c6bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5a90c6bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5a90c6bd
Branch: refs/heads/master
Commit: 5a90c6bdd0a4c279fca7665532c4a34992ffbb24
Parents: 98a6176
Author: kl0u <kk...@gmail.com>
Authored: Thu Nov 3 11:08:45 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Nov 11 14:05:58 2016 +0100
----------------------------------------------------------------------
.../source/ContinuousFileMonitoringFunction.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5a90c6bd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index 10068a6..54ab0ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -147,8 +147,15 @@ public class ContinuousFileMonitoringFunction<OUT>
break;
case PROCESS_ONCE:
synchronized (checkpointLock) {
- monitorDirAndForwardSplits(fileSystem, context);
- globalModificationTime = Long.MAX_VALUE;
+
+ // the following check guarantees that if we restart
+ // after a failure and we managed to have a successful
+ // checkpoint, we will not reprocess the directory.
+
+ if (globalModificationTime == Long.MIN_VALUE) {
+ monitorDirAndForwardSplits(fileSystem, context);
+ globalModificationTime = Long.MAX_VALUE;
+ }
isRunning = false;
}
break;