You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/23 16:49:24 UTC

flink git commit: [FLINK-4482] [checkpoints] Make numUnsuccessfulCheckpointsTriggers an atomic integer

Repository: flink
Updated Branches:
  refs/heads/master 9a1bc021a -> 5d0358af4


[FLINK-4482] [checkpoints] Make numUnsuccessfulCheckpointsTriggers an atomic integer

This closes #2421


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d0358af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d0358af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d0358af

Branch: refs/heads/master
Commit: 5d0358af46e0f3683224986a7718adca88f504db
Parents: 9a1bc02
Author: tedyu <yu...@gmail.com>
Authored: Thu Aug 25 11:11:44 2016 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 18:48:32 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d0358af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3586d98..fc40911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -53,6 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -135,7 +136,7 @@ public class CheckpointCoordinator {
 	private JobStatusListener jobStatusListener;
 
 	/** The number of consecutive failed trigger attempts */
-	private int numUnsuccessfulCheckpointsTriggers;
+	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 
 	private ScheduledTrigger currentPeriodicTrigger;
 
@@ -397,7 +398,7 @@ public class CheckpointCoordinator {
 				checkpointID = checkpointIdCounter.getAndIncrement();
 			}
 			catch (Throwable t) {
-				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
@@ -490,7 +491,7 @@ public class CheckpointCoordinator {
 					tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
 				}
 
-				numUnsuccessfulCheckpointsTriggers = 0;
+				numUnsuccessfulCheckpointsTriggers.set(0);
 				return new CheckpointTriggerResult(checkpoint);
 			}
 			catch (Throwable t) {
@@ -499,7 +500,7 @@ public class CheckpointCoordinator {
 					pendingCheckpoints.remove(checkpointID);
 				}
 
-				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
 
 				if (!checkpoint.isDiscarded()) {
@@ -964,7 +965,7 @@ public class CheckpointCoordinator {
 			}
 
 			pendingCheckpoints.clear();
-			numUnsuccessfulCheckpointsTriggers = 0;
+			numUnsuccessfulCheckpointsTriggers.set(0);
 		}
 	}