You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/15 19:28:45 UTC
flink git commit: [FLINK-5962] [checkpoints] Remove scheduled
cancel-task from timer queue to prevent memory leaks
Repository: flink
Updated Branches:
refs/heads/release-1.2 9206df666 -> 9d59e008d
[FLINK-5962] [checkpoints] Remove scheduled cancel-task from timer queue to prevent memory leaks
This closes #3548
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d59e008
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d59e008
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d59e008
Branch: refs/heads/release-1.2
Commit: 9d59e008d8849cdfe2daf302e251454435bb997f
Parents: 9206df6
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 15 16:44:41 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 15 20:28:15 2017 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 82 +++++++++++++-------
.../runtime/checkpoint/PendingCheckpoint.java | 38 +++++++++
.../checkpoint/CheckpointCoordinatorTest.java | 16 +++-
.../checkpoint/PendingCheckpointTest.java | 20 ++++-
4 files changed, 126 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 78cad91..cb8417a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +48,10 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -123,7 +126,7 @@ public class CheckpointCoordinator {
private final int maxConcurrentCheckpointAttempts;
/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
- private final Timer timer;
+ private final ScheduledThreadPoolExecutor timer;
/** Actor that receives status updates from the execution graph this coordinator works for */
private JobStatusListener jobStatusListener;
@@ -131,7 +134,8 @@ public class CheckpointCoordinator {
/** The number of consecutive failed trigger attempts */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
- private ScheduledTrigger currentPeriodicTrigger;
+ /** A handle to the current periodic trigger, to cancel it when necessary */
+ private ScheduledFuture<?> currentPeriodicTrigger;
/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed */
private long lastCheckpointCompletionNanos;
@@ -210,7 +214,13 @@ public class CheckpointCoordinator {
this.checkpointDirectory = checkpointDirectory;
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
- this.timer = new Timer("Checkpoint Timer", true);
+ this.timer = new ScheduledThreadPoolExecutor(1,
+ new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
+
+ // make sure the timer internally cleans up and does not hold onto stale scheduled tasks
+ this.timer.setRemoveOnCancelPolicy(true);
+ this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
if (externalizeSettings.externalizeCheckpoints()) {
LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory);
@@ -259,7 +269,7 @@ public class CheckpointCoordinator {
triggerRequestQueued = false;
// shut down the thread that handles the timeouts and pending triggers
- timer.cancel();
+ timer.shutdownNow();
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
@@ -359,7 +369,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -371,13 +381,14 @@ public class CheckpointCoordinator {
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -450,7 +461,7 @@ public class CheckpointCoordinator {
}
// schedule the timer that will clean up the expired checkpoints
- TimerTask canceller = new TimerTask() {
+ final Runnable canceller = new Runnable() {
@Override
public void run() {
synchronized (lock) {
@@ -486,7 +497,7 @@ public class CheckpointCoordinator {
if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
triggerRequestQueued = true;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
@@ -498,14 +509,15 @@ public class CheckpointCoordinator {
if (durationTillNextMillis > 0) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
- ScheduledTrigger trigger = new ScheduledTrigger();
// Reassign the new trigger to the currentPeriodicTrigger
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, durationTillNextMillis, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
+
return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
}
}
@@ -513,7 +525,15 @@ public class CheckpointCoordinator {
LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
pendingCheckpoints.put(checkpointID, checkpoint);
- timer.schedule(canceller, checkpointTimeout);
+
+ ScheduledFuture<?> cancellerHandle = timer.schedule(
+ canceller,
+ checkpointTimeout, TimeUnit.MILLISECONDS);
+
+ if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+ // checkpoint is already disposed!
+ cancellerHandle.cancel(false);
+ }
}
// end of lock scope
@@ -623,7 +643,7 @@ public class CheckpointCoordinator {
* @return Flag indicating whether the ack'd checkpoint was associated
* with a pending checkpoint.
*
- * @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
+ * @throws CheckpointException If the checkpoint cannot be added to the completed checkpoint store.
*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
@@ -819,20 +839,25 @@ public class CheckpointCoordinator {
// trigger the checkpoint from the trigger timer, to finish the work of this thread before
// starting with the next checkpoint
- ScheduledTrigger trigger = new ScheduledTrigger();
if (periodicScheduling) {
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
}
- currentPeriodicTrigger = trigger;
- timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ 0L, baseInterval, TimeUnit.MILLISECONDS);
}
else {
- timer.schedule(trigger, 0L);
+ timer.execute(new ScheduledTrigger());
}
}
}
+ @VisibleForTesting
+ int getNumScheduledTasks() {
+ return timer.getQueue().size();
+ }
+
// --------------------------------------------------------------------------------------------
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
@@ -959,8 +984,9 @@ public class CheckpointCoordinator {
stopCheckpointScheduler();
periodicScheduling = true;
- currentPeriodicTrigger = new ScheduledTrigger();
- timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
+ currentPeriodicTrigger = timer.scheduleAtFixedRate(
+ new ScheduledTrigger(),
+ baseInterval, baseInterval, TimeUnit.MILLISECONDS);
}
}
@@ -970,7 +996,7 @@ public class CheckpointCoordinator {
periodicScheduling = false;
if (currentPeriodicTrigger != null) {
- currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger = null;
}
@@ -1003,7 +1029,7 @@ public class CheckpointCoordinator {
// ------------------------------------------------------------------------
- private class ScheduledTrigger extends TimerTask {
+ private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1531f0f..9859e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -42,6 +42,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,6 +96,8 @@ public class PendingCheckpoint {
@Nullable
private PendingCheckpointStats statsCallback;
+ private volatile ScheduledFuture<?> cancellerHandle;
+
// --------------------------------------------------------------------------------------------
public PendingCheckpoint(
@@ -191,6 +194,27 @@ public class PendingCheckpoint {
this.statsCallback = checkNotNull(trackerCallback);
}
+ /**
+ * Sets the handle for the canceller to this pending checkoint.
+ *
+ * @return true, if the handle was set, false, if the checkpoint is already disposed;
+ */
+ public boolean setCancellerHandle(ScheduledFuture<?> cancellerHandle) {
+ synchronized (lock) {
+ if (this.cancellerHandle == null) {
+ if (!discarded) {
+ this.cancellerHandle = cancellerHandle;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ else {
+ throw new IllegalStateException("A canceller handle was already set");
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// Progress and Completion
// ------------------------------------------------------------------------
@@ -427,10 +451,24 @@ public class PendingCheckpoint {
discarded = true;
notYetAcknowledgedTasks.clear();
acknowledgedTasks.clear();
+ cancelCanceller();
}
}
}
+ private void cancelCanceller() {
+ try {
+ final ScheduledFuture<?> canceller = this.cancellerHandle;
+ if (canceller != null) {
+ canceller.cancel(false);
+ }
+ }
+ catch (Exception e) {
+ // this code should not throw exceptions
+ LOG.warn("Error while cancelling checkpoint timeout task", e);
+ }
+ }
+
/**
* Reports a failed checkpoint with the given optional cause.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6ba557b..eeab445 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -299,6 +299,9 @@ public class CheckpointCoordinatorTest {
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ // we have one task scheduled that will cancel after timeout
+ assertEquals(1, coord.getNumScheduledTasks());
+
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -336,6 +339,9 @@ public class CheckpointCoordinatorTest {
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
assertTrue(checkpoint.isDiscarded());
+ // the canceler is also removed
+ assertEquals(0, coord.getNumScheduledTasks());
+
// validate that we have no new pending checkpoint
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -389,6 +395,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -399,6 +406,7 @@ public class CheckpointCoordinatorTest {
// validate that we have a pending checkpoint
assertEquals(2, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(2, coord.getNumScheduledTasks());
Iterator<Map.Entry<Long, PendingCheckpoint>> it = coord.getPendingCheckpoints().entrySet().iterator();
long checkpoint1Id = it.next().getKey();
@@ -439,13 +447,13 @@ public class CheckpointCoordinatorTest {
}
// decline checkpoint from one of the tasks, this should cancel the checkpoint
- // and trigger a new one
coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumScheduledTasks());
// validate that it is the same second checkpoint from earlier
long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
@@ -506,6 +514,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
// trigger the first checkpoint. this should succeed
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -513,6 +522,7 @@ public class CheckpointCoordinatorTest {
// validate that we have a pending checkpoint
assertEquals(1, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(1, coord.getNumScheduledTasks());
long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId);
@@ -558,6 +568,9 @@ public class CheckpointCoordinatorTest {
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
+ // the canceler should be removed now
+ assertEquals(0, coord.getNumScheduledTasks());
+
// validate that the relevant tasks got a confirmation message
{
verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp));
@@ -583,6 +596,7 @@ public class CheckpointCoordinatorTest {
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+ assertEquals(0, coord.getNumScheduledTasks());
CompletedCheckpoint successNew = coord.getSuccessfulCheckpoints().get(0);
assertEquals(jid, successNew.getJobId());
http://git-wip-us.apache.org/repos/asf/flink/blob/9d59e008/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 4358526..cee7dd5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -44,7 +46,6 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -279,6 +280,23 @@ public class PendingCheckpointTest {
}
}
+ @Test
+ public void testSetCanceller() {
+ final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true, true, true);
+
+ PendingCheckpoint aborted = createPendingCheckpoint(props, null);
+ aborted.abortDeclined();
+ assertTrue(aborted.isDiscarded());
+ assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
+
+ PendingCheckpoint pending = createPendingCheckpoint(props, null);
+ ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
+
+ assertTrue(pending.setCancellerHandle(canceller));
+ pending.abortDeclined();
+ verify(canceller).cancel(false);
+ }
+
// ------------------------------------------------------------------------
private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {