You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/15 19:28:45 UTC

flink git commit: [FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks

Repository: flink
Updated Branches:
  refs/heads/release-1.2 9206df666 -> 9d59e008d


[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks

This closes #3548


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d59e008
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d59e008
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d59e008

Branch: refs/heads/release-1.2
Commit: 9d59e008d8849cdfe2daf302e251454435bb997f
Parents: 9206df6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 15 20:28:15 2017 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 82 +++++++++++++-------
 .../runtime/checkpoint/PendingCheckpoint.java   | 38 +++++++++
 .../checkpoint/CheckpointCoordinatorTest.java   | 16 +++-
 .../checkpoint/PendingCheckpointTest.java       | 20 ++++-
 4 files changed, 126 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..cb8417a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,9 +48,10 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -123,7 +126,7 @@ public class CheckpointCoordinator {
 	private final int maxConcurrentCheckpointAttempts;
 
 	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
-	private final Timer timer;
+	private final ScheduledThreadPoolExecutor timer;
 
 	/** Actor that receives status updates from the execution graph this coordinator works for */
 	private JobStatusListener jobStatusListener;
@@ -131,7 +134,8 @@ public class CheckpointCoordinator {
 	/** The number of consecutive failed trigger attempts */
 	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 
-	private ScheduledTrigger currentPeriodicTrigger;
+	/** A handle to the current periodic trigger, to cancel it when necessary */
+	private ScheduledFuture<?> currentPeriodicTrigger;
 
 	/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
 	private long lastCheckpointCompletionNanos;
@@ -210,7 +214,13 @@ public class CheckpointCoordinator {
 		this.checkpointDirectory = checkpointDirectory;
 		this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
 
-		this.timer = new Timer("Checkpoint Timer", true);
+		this.timer = new ScheduledThreadPoolExecutor(1,
+				new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
+
+		// make sure the timer internally cleans up and does not hold onto stale scheduled tasks
+		this.timer.setRemoveOnCancelPolicy(true);
+		this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+		this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 
 		if (externalizeSettings.externalizeCheckpoints()) {
 			LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
@@ -259,7 +269,7 @@ public class CheckpointCoordinator {
 				triggerRequestQueued = false;
 
 				// shut down the thread that handles the timeouts and pending triggers
-				timer.cancel();
+				timer.shutdownNow();
 
 				// clear and discard all pending checkpoints
 				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -359,7 +369,7 @@ public class CheckpointCoordinator {
 				if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 					triggerRequestQueued = true;
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
 					return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -371,13 +381,14 @@ public class CheckpointCoordinator {
 
 				if (durationTillNextMillis > 0) {
 					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger.cancel(false);
 						currentPeriodicTrigger = null;
 					}
-					ScheduledTrigger trigger = new ScheduledTrigger();
 					// Reassign the new trigger to the currentPeriodicTrigger
-					currentPeriodicTrigger = trigger;
-					timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+					currentPeriodicTrigger = timer.scheduleAtFixedRate(
+							new ScheduledTrigger(),
+							durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 					return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 				}
 			}
@@ -450,7 +461,7 @@ public class CheckpointCoordinator {
 			}
 
 			// schedule the timer that will clean up the expired checkpoints
-			TimerTask canceller = new TimerTask() {
+			final Runnable canceller = new Runnable() {
 				@Override
 				public void run() {
 					synchronized (lock) {
@@ -486,7 +497,7 @@ public class CheckpointCoordinator {
 						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 							triggerRequestQueued = true;
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 							return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -498,14 +509,15 @@ public class CheckpointCoordinator {
 
 						if (durationTillNextMillis > 0) {
 							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel();
+								currentPeriodicTrigger.cancel(false);
 								currentPeriodicTrigger = null;
 							}
 
-							ScheduledTrigger trigger = new ScheduledTrigger();
 							// Reassign the new trigger to the currentPeriodicTrigger
-							currentPeriodicTrigger = trigger;
-							timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+							currentPeriodicTrigger = timer.scheduleAtFixedRate(
+									new ScheduledTrigger(),
+									durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
 							return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 						}
 					}
@@ -513,7 +525,15 @@ public class CheckpointCoordinator {
 					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
-					timer.schedule(canceller, checkpointTimeout);
+
+					ScheduledFuture<?> cancellerHandle = timer.schedule(
+							canceller,
+							checkpointTimeout, TimeUnit.MILLISECONDS);
+
+					if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+						// checkpoint is already disposed!
+						cancellerHandle.cancel(false);
+					}
 				}
 				// end of lock scope
 
@@ -623,7 +643,7 @@ public class CheckpointCoordinator {
 	 * @return Flag indicating whether the ack'd checkpoint was associated
 	 * with a pending checkpoint.
 	 *
-	 * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
+	 * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
 	 */
 	public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
 		if (shutdown || message == null) {
@@ -819,20 +839,25 @@ public class CheckpointCoordinator {
 
 			// trigger the checkpoint from the trigger timer, to finish the work of this thread before
 			// starting with the next checkpoint
-			ScheduledTrigger trigger = new ScheduledTrigger();
 			if (periodicScheduling) {
 				if (currentPeriodicTrigger != null) {
-					currentPeriodicTrigger.cancel();
+					currentPeriodicTrigger.cancel(false);
 				}
-				currentPeriodicTrigger = trigger;
-				timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+				currentPeriodicTrigger = timer.scheduleAtFixedRate(
+						new ScheduledTrigger(),
+						0L, baseInterval, TimeUnit.MILLISECONDS);
 			}
 			else {
-				timer.schedule(trigger, 0L);
+				timer.execute(new ScheduledTrigger());
 			}
 		}
 	}
 
+	@VisibleForTesting
+	int getNumScheduledTasks() {
+		return timer.getQueue().size();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
@@ -959,8 +984,9 @@ public class CheckpointCoordinator {
 			stopCheckpointScheduler();
 
 			periodicScheduling = true;
-			currentPeriodicTrigger = new ScheduledTrigger();
-			timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
+			currentPeriodicTrigger = timer.scheduleAtFixedRate(
+					new ScheduledTrigger(), 
+					baseInterval, baseInterval, TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -970,7 +996,7 @@ public class CheckpointCoordinator {
 			periodicScheduling = false;
 
 			if (currentPeriodicTrigger != null) {
-				currentPeriodicTrigger.cancel();
+				currentPeriodicTrigger.cancel(false);
 				currentPeriodicTrigger = null;
 			}
 
@@ -1003,7 +1029,7 @@ public class CheckpointCoordinator {
 
 	// ------------------------------------------------------------------------
 
-	private class ScheduledTrigger extends TimerTask {
+	private final class ScheduledTrigger implements Runnable {
 
 		@Override
 		public void run() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1531f0f..9859e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,6 +96,8 @@ public class PendingCheckpoint {
 	@Nullable
 	private PendingCheckpointStats statsCallback;
 
+	private volatile ScheduledFuture<?> cancellerHandle;
+
 	// --------------------------------------------------------------------------------------------
 
 	public PendingCheckpoint(
@@ -191,6 +194,27 @@ public class PendingCheckpoint {
 		this.statsCallback = checkNotNull(trackerCallback);
 	}
 
+	/**
+	 * Sets the handle for the canceller to this pending checkoint.
+	 * 
+	 * @return true, if the handle was set, false, if the checkpoint is already disposed;
+	 */
+	public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+		synchronized (lock) {
+			if (this.cancellerHandle == null) {
+				if (!discarded) {
+					this.cancellerHandle = cancellerHandle;
+					return true;
+				} else {
+					return false;
+				}
+			}
+			else {
+				throw new IllegalStateException("A canceller handle was already set");
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Progress and Completion
 	// ------------------------------------------------------------------------
@@ -427,10 +451,24 @@ public class PendingCheckpoint {
 				discarded = true;
 				notYetAcknowledgedTasks.clear();
 				acknowledgedTasks.clear();
+				cancelCanceller();
 			}
 		}
 	}
 
+	private void cancelCanceller() {
+		try {
+			final ScheduledFuture<?> canceller = this.cancellerHandle;
+			if (canceller != null) {
+				canceller.cancel(false);
+			}
+		}
+		catch (Exception e) {
+			// this code should not throw exceptions
+			LOG.warn("Error while cancelling checkpoint timeout task", e);
+		}
+	}
+
 	/**
 	 * Reports a failed checkpoint with the given optional cause.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6ba557b..eeab445 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
+			// we have one task scheduled that will cancel after timeout
+			assertEquals(1, coord.getNumScheduledTasks());
+
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
 
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
 			assertTrue(checkpoint.isDiscarded());
 
+			// the canceler is also removed
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that we have no new pending checkpoint
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(2, coord.getNumScheduledTasks());
 
 			Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
 			long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
 			}
 
 			// decline checkpoint from one of the tasks, this should cancel the checkpoint
-			// and trigger a new one
 			coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
 			assertTrue(checkpoint1.isDiscarded());
 
 			// validate that we have only one pending checkpoint left
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			// validate that it is the same second checkpoint from earlier
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(1, coord.getNumScheduledTasks());
 
 			long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -558,6 +568,9 @@ public class CheckpointCoordinatorTest {
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
+			// the canceler should be removed now
+			assertEquals(0, coord.getNumScheduledTasks());
+
 			// validate that the relevant tasks got a confirmation message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
@@ -583,6 +596,7 @@ public class CheckpointCoordinatorTest {
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumScheduledTasks());
 
 			CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
 			assertEquals(jid, successNew.getJobId());

http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 4358526..cee7dd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -44,7 +46,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -279,6 +280,23 @@ public class PendingCheckpointTest {
 		}
 	}
 
+	@Test
+	public void testSetCanceller() {
+		final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
+
+		PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+		aborted.abortDeclined();
+		assertTrue(aborted.isDiscarded());
+		assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+		PendingCheckpoint pending = createPendingCheckpoint(props, null);
+		ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+		assertTrue(pending.setCancellerHandle(canceller));
+		pending.abortDeclined();
+		verify(canceller).cancel(false);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {