You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/23 16:49:24 UTC
flink git commit: [FLINK-4482] [checkpoints] Make
numUnsuccessfulCheckpointsTriggers an atomic integer
Repository: flink
Updated Branches:
refs/heads/master 9a1bc021a -> 5d0358af4
[FLINK-4482] [checkpoints] Make numUnsuccessfulCheckpointsTriggers an atomic integer
This closes #2421
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d0358af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d0358af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d0358af
Branch: refs/heads/master
Commit: 5d0358af46e0f3683224986a7718adca88f504db
Parents: 9a1bc02
Author: tedyu <yu...@gmail.com>
Authored: Thu Aug 25 11:11:44 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 18:48:32 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/checkpoint/CheckpointCoordinator.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5d0358af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3586d98..fc40911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -135,7 +136,7 @@ public class CheckpointCoordinator {
private JobStatusListener jobStatusListener;
/** The number of consecutive failed trigger attempts */
- private int numUnsuccessfulCheckpointsTriggers;
+ private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
private ScheduledTrigger currentPeriodicTrigger;
@@ -397,7 +398,7 @@ public class CheckpointCoordinator {
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) {
- int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+ int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
}
@@ -490,7 +491,7 @@ public class CheckpointCoordinator {
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
- numUnsuccessfulCheckpointsTriggers = 0;
+ numUnsuccessfulCheckpointsTriggers.set(0);
return new CheckpointTriggerResult(checkpoint);
}
catch (Throwable t) {
@@ -499,7 +500,7 @@ public class CheckpointCoordinator {
pendingCheckpoints.remove(checkpointID);
}
- int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+ int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
if (!checkpoint.isDiscarded()) {
@@ -964,7 +965,7 @@ public class CheckpointCoordinator {
}
pendingCheckpoints.clear();
- numUnsuccessfulCheckpointsTriggers = 0;
+ numUnsuccessfulCheckpointsTriggers.set(0);
}
}