You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 19:57:24 UTC
flink git commit: [FLINK-2356] Add shutdown hook to
CheckpointCoordinator to prevent resource leaks
Repository: flink
Updated Branches:
refs/heads/release-0.9 eba62a6dc -> 3cdbb8014
[FLINK-2356] Add shutdown hook to CheckpointCoordinator to prevent resource leaks
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cdbb801
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cdbb801
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cdbb801
Branch: refs/heads/release-0.9
Commit: 3cdbb8014bd835f8f6d7ddb4b8a9451e1557cb3a
Parents: eba62a6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Aug 26 18:03:28 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 19:56:45 2015 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 106 +++++++++++++------
1 file changed, 75 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3cdbb801/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 424a9ed..b8a5263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
-
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -98,7 +97,10 @@ public class CheckpointCoordinator {
private ClassLoader userClassLoader;
- private boolean shutdown;
+ private volatile boolean shutdown;
+
+ /** Shutdown hook thread to clean up state handles. */
+ private final Thread shutdownHook;
// --------------------------------------------------------------------------------------------
@@ -132,6 +134,31 @@ public class CheckpointCoordinator {
this.userClassLoader = userClassLoader;
timer = new Timer("Checkpoint Timer", true);
+
+ // Add shutdown hook to clean up state handles
+ shutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ CheckpointCoordinator.this.shutdown();
+ }
+ catch (Throwable t) {
+ LOG.error("Error during shutdown of blob service via JVM shutdown hook: " +
+ t.getMessage(), t);
+ }
+ }
+ });
+
+ try {
+ // Add JVM shutdown hook to call shutdown of service
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // JVM is already shutting down. No need to do anything.
+ }
+ catch (Throwable t) {
+ LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+ }
}
// --------------------------------------------------------------------------------------------
@@ -146,38 +173,55 @@ public class CheckpointCoordinator {
*/
public void shutdown() {
synchronized (lock) {
- if (shutdown) {
- return;
- }
- shutdown = true;
- LOG.info("Stopping checkpoint coordinator for job " + job);
-
- // shut down the thread that handles the timeouts
- timer.cancel();
-
- // make sure that the actor does not linger
- if (jobStatusListener != null) {
- jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
- jobStatusListener = null;
- }
-
- // the scheduling thread needs also to go away
- if (periodicScheduler != null) {
- periodicScheduler.cancel();
- periodicScheduler = null;
- }
-
- // clear and discard all pending checkpoints
- for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+ try {
+ if (shutdown) {
+ return;
+ }
+ shutdown = true;
+ LOG.info("Stopping checkpoint coordinator for job " + job);
+
+ // shut down the thread that handles the timeouts
+ timer.cancel();
+
+ // make sure that the actor does not linger
+ if (jobStatusListener != null) {
+ jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ jobStatusListener = null;
+ }
+
+ // the scheduling thread needs also to go away
+ if (periodicScheduler != null) {
+ periodicScheduler.cancel();
+ periodicScheduler = null;
+ }
+
+ // clear and discard all pending checkpoints
+ for (PendingCheckpoint pending : pendingCheckpoints.values()) {
pending.discard(userClassLoader, true);
+ }
+ pendingCheckpoints.clear();
+
+ // clean and discard all successful checkpoints
+ for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
+ checkpoint.discard(userClassLoader);
+ }
+ completedCheckpoints.clear();
}
- pendingCheckpoints.clear();
-
- // clean and discard all successful checkpoints
- for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
- checkpoint.discard(userClassLoader);
+ finally {
+ // Remove shutdown hook to prevent resource leaks, unless this is invoked by the
+ // shutdown hook itself.
+ if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ }
+ catch (IllegalStateException ignored) {
+ // race, JVM is in shutdown already, we can safely ignore this
+ }
+ catch (Throwable t) {
+ LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t);
+ }
+ }
}
- completedCheckpoints.clear();
}
}