You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:53 UTC

[4/6] flink git commit: [FLINK-4437] [checkpoints] Properly lock the triggerCheckpoint() method

[FLINK-4437] [checkpoints] Properly lock the triggerCheckpoint() method

This introduces a trigger-lock that makes sure checkpoint trigger attemps to not overtake
each other (as may otherwise be the case for periodic checkpoints and manual savepoints).

This also fixes the evaluation of the min-delay-between-checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4da40bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4da40bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4da40bcb

Branch: refs/heads/master
Commit: 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e
Parents: 4e45659
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 14:02:47 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 19:56:17 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 219 +++++++++++--------
 .../checkpoint/CoordinatorShutdownTest.java     |   3 +-
 2 files changed, 133 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4da40bcb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ff54bad..2c0e63b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -80,6 +80,13 @@ public class CheckpointCoordinator {
 	/** Coordinator-wide lock to safeguard the checkpoint updates */
 	private final Object lock = new Object();
 
+	/** Lock specially to make sure that trigger requests do not overtake each other.
+	 * This is not done with the coordinator-wide lock, because as part of triggering,
+	 * blocking operations may happen (distributed atomic counters).
+	 * Using a dedicated lock, we avoid blocking the processing of 'acknowledge/decline'
+	 * messages during that phase. */
+	private final Object triggerLock = new Object();
+
 	/** The job whose checkpoint this coordinator coordinates */
 	private final JobID job;
 
@@ -179,6 +186,12 @@ public class CheckpointCoordinator {
 		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
 		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 
+		// it does not make sense to schedule checkpoints more often then the desired
+		// time between checkpoints
+		if (baseInterval < minPauseBetweenCheckpoints) {
+			baseInterval = minPauseBetweenCheckpoints;
+		}
+
 		this.job = checkNotNull(job);
 		this.baseInterval = baseInterval;
 		this.checkpointTimeout = checkpointTimeout;
@@ -202,8 +215,8 @@ public class CheckpointCoordinator {
 			// Make sure the checkpoint ID enumerator is running. Possibly
 			// issues a blocking call to ZooKeeper.
 			checkpointIDCounter.start();
-		} catch (Exception e) {
-			throw new Exception("Failed to start checkpoint ID counter: " + e.getMessage(), e);
+		} catch (Throwable t) {
+			throw new Exception("Failed to start checkpoint ID counter: " + t.getMessage(), t);
 		}
 	}
 
@@ -335,7 +348,13 @@ public class CheckpointCoordinator {
 				}
 
 				// make sure the minimum interval between checkpoints has passed
-				if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) {
+				long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
+				if (nextCheckpointEarliest < 0) {
+					// overflow
+					nextCheckpointEarliest = Long.MAX_VALUE;
+				}
+
+				if (nextCheckpointEarliest > timestamp) {
 					if (currentPeriodicTrigger != null) {
 						currentPeriodicTrigger.cancel();
 						currentPeriodicTrigger = null;
@@ -343,7 +362,8 @@ public class CheckpointCoordinator {
 					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
 					currentPeriodicTrigger = trigger;
-					timer.scheduleAtFixedRate(trigger, minPauseBetweenCheckpoints, baseInterval);
+					long delay = nextCheckpointEarliest - timestamp;
+					timer.scheduleAtFixedRate(trigger, delay, baseInterval);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -380,105 +400,130 @@ public class CheckpointCoordinator {
 
 		// we will actually trigger this checkpoint!
 
-		lastTriggeredCheckpoint = timestamp;
-		final long checkpointID;
-		try {
-			// this must happen outside the locked scope, because it communicates
-			// with external services (in HA mode) and may block for a while.
-			checkpointID = checkpointIdCounter.getAndIncrement();
-		}
-		catch (Throwable t) {
-			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
-			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-			return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
-		}
+		// we lock with a special lock to make sure that trigger requests do not overtake each other.
+		// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
+		// may issue blocking operations. Using a different lock than teh coordinator-wide lock,
+		// we avoid blocking the processing of 'acknowledge/decline' messages during that time.
+		synchronized (triggerLock) {
+			final long checkpointID;
+			try {
+				// this must happen outside the coordinator-wide lock, because it communicates
+				// with external services (in HA mode) and may block for a while.
+				checkpointID = checkpointIdCounter.getAndIncrement();
+			}
+			catch (Throwable t) {
+				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+			}
 
-		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+			final PendingCheckpoint checkpoint = props.isSavepoint() ?
+				new PendingSavepoint(job, checkpointID, timestamp, ackTasks, userClassLoader, savepointStore) :
+				new PendingCheckpoint(job, checkpointID, timestamp, ackTasks, userClassLoader);
+
+			// schedule the timer that will clean up the expired checkpoints
+			TimerTask canceller = new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						synchronized (lock) {
+							// only do the work if the checkpoint is not discarded anyways
+							// note that checkpoint completion discards the pending checkpoint object
+							if (!checkpoint.isDiscarded()) {
+								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+	
+								checkpoint.abortExpired();
+								pendingCheckpoints.remove(checkpointID);
+								rememberRecentCheckpointId(checkpointID);
+	
+								triggerQueuedRequests();
+							}
+						}
+					}
+					catch (Throwable t) {
+						LOG.error("Exception while handling checkpoint timeout", t);
+					}
+				}
+			};
 
-		final PendingCheckpoint checkpoint = props.isSavepoint() ?
-			new PendingSavepoint(job, checkpointID, timestamp, ackTasks, userClassLoader, savepointStore) :
-			new PendingCheckpoint(job, checkpointID, timestamp, ackTasks, userClassLoader);
+			try {
+				// re-acquire the coordinator-wide lock
+				synchronized (lock) {
+					// since we released the lock in the meantime, we need to re-check
+					// that the conditions still hold.
+					if (shutdown) {
+						return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+					}
+					else if (!props.isSavepoint()) {
+						if (triggerRequestQueued) {
+							LOG.warn("Trying to trigger another checkpoint while one was queued already");
+							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+						}
 
-		// schedule the timer that will clean up the expired checkpoints
-		TimerTask canceller = new TimerTask() {
-			@Override
-			public void run() {
-				try {
-					synchronized (lock) {
-						// only do the work if the checkpoint is not discarded anyways
-						// note that checkpoint completion discards the pending checkpoint object
-						if (!checkpoint.isDiscarded()) {
-							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+							triggerRequestQueued = true;
+							if (currentPeriodicTrigger != null) {
+								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger = null;
+							}
+							return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+						}
 
-							checkpoint.abortExpired();
-							pendingCheckpoints.remove(checkpointID);
-							rememberRecentCheckpointId(checkpointID);
+						// make sure the minimum interval between checkpoints has passed
+						long nextCheckpointEarliest = lastTriggeredCheckpoint + minPauseBetweenCheckpoints;
+						if (nextCheckpointEarliest < 0) {
+							// overflow
+							nextCheckpointEarliest = Long.MAX_VALUE;
+						}
 
-							triggerQueuedRequests();
+						if (nextCheckpointEarliest > timestamp) {
+							if (currentPeriodicTrigger != null) {
+								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger = null;
+							}
+							ScheduledTrigger trigger = new ScheduledTrigger();
+							// Reassign the new trigger to the currentPeriodicTrigger
+							currentPeriodicTrigger = trigger;
+							long delay = nextCheckpointEarliest - timestamp;
+							timer.scheduleAtFixedRate(trigger, delay, baseInterval);
+							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
-				}
-				catch (Throwable t) {
-					LOG.error("Exception while handling checkpoint timeout", t);
-				}
-			}
-		};
 
-		try {
-			// re-acquire the lock
-			synchronized (lock) {
-				// since we released the lock in the meantime, we need to re-check
-				// that the conditions still hold. this is clumsy, but it allows us to
-				// release the lock in the meantime while calls to external services are
-				// blocking progress, and still gives us early checks that skip work
-				// if no checkpoint can happen anyways
-				if (shutdown) {
-					return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+
+					lastTriggeredCheckpoint = Math.max(timestamp, lastTriggeredCheckpoint);
+					pendingCheckpoints.put(checkpointID, checkpoint);
+					timer.schedule(canceller, checkpointTimeout);
 				}
-				else if (!props.isSavepoint()) {
-					if (triggerRequestQueued) {
-						LOG.warn("Trying to trigger another checkpoint while one was queued already");
-						return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
-					}
-					else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
-						triggerRequestQueued = true;
-						if (currentPeriodicTrigger != null) {
-							currentPeriodicTrigger.cancel();
-							currentPeriodicTrigger = null;
-						}
-						return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-					}
+				// end of lock scope
+
+				// send the messages to the tasks that trigger their checkpoint
+				for (int i = 0; i < tasksToTrigger.length; i++) {
+					ExecutionAttemptID id = triggerIDs[i];
+					TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
+					tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
 				}
 
-				pendingCheckpoints.put(checkpointID, checkpoint);
-				timer.schedule(canceller, checkpointTimeout);
+				numUnsuccessfulCheckpointsTriggers = 0;
+				return new CheckpointTriggerResult(checkpoint);
 			}
-			// end of lock scope
+			catch (Throwable t) {
+				// guard the map against concurrent modifications
+				synchronized (lock) {
+					pendingCheckpoints.remove(checkpointID);
+				}
 
-			// send the messages to the tasks that trigger their checkpoint
-			for (int i = 0; i < tasksToTrigger.length; i++) {
-				ExecutionAttemptID id = triggerIDs[i];
-				TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
-				tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
-			}
+				int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
 
-			numUnsuccessfulCheckpointsTriggers = 0;
-			return new CheckpointTriggerResult(checkpoint);
-		}
-		catch (Throwable t) {
-			// guard the map against concurrent modifications
-			synchronized (lock) {
-				pendingCheckpoints.remove(checkpointID);
+				if (!checkpoint.isDiscarded()) {
+					checkpoint.abortError(new Exception("Failed to trigger checkpoint"));
+				}
+				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
-			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
-			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-
-			if (!checkpoint.isDiscarded()) {
-				checkpoint.abortError(new Exception("Failed to trigger checkpoint"));
-			}
-			return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
-		}
+		} // end trigger lock
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4da40bcb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 91a83b2..c43cf2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
@@ -29,10 +28,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.Tasks;
-
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import org.junit.Test;
 
 import scala.concurrent.Await;