You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/08/26 19:57:24 UTC

flink git commit: [FLINK-2356] Add shutdown hook to CheckpointCoordinator to prevent resource leaks

Repository: flink
Updated Branches:
  refs/heads/release-0.9 eba62a6dc -> 3cdbb8014


[FLINK-2356] Add shutdown hook to CheckpointCoordinator to prevent resource leaks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cdbb801
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cdbb801
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cdbb801

Branch: refs/heads/release-0.9
Commit: 3cdbb8014bd835f8f6d7ddb4b8a9451e1557cb3a
Parents: eba62a6
Author: Ufuk Celebi <uc...@apache.org>
Authored: Wed Aug 26 18:03:28 2015 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Aug 26 19:56:45 2015 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 106 +++++++++++++------
 1 file changed, 75 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3cdbb801/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 424a9ed..b8a5263 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -22,7 +22,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
-
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -98,7 +97,10 @@ public class CheckpointCoordinator {
 	
 	private ClassLoader userClassLoader;
 	
-	private boolean shutdown;
+	private volatile boolean shutdown;
+
+	/** Shutdown hook thread to clean up state handles. */
+	private final Thread shutdownHook;
 	
 	// --------------------------------------------------------------------------------------------
 
@@ -132,6 +134,31 @@ public class CheckpointCoordinator {
 		this.userClassLoader = userClassLoader;
 
 		timer = new Timer("Checkpoint Timer", true);
+
+		// Add shutdown hook to clean up state handles
+		shutdownHook = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					CheckpointCoordinator.this.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.error("Error during shutdown of blob service via JVM shutdown hook: " +
+							t.getMessage(), t);
+				}
+			}
+		});
+
+		try {
+			// Add JVM shutdown hook to call shutdown of service
+			Runtime.getRuntime().addShutdownHook(shutdownHook);
+		}
+		catch (IllegalStateException ignored) {
+			// JVM is already shutting down. No need to do anything.
+		}
+		catch (Throwable t) {
+			LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -146,38 +173,55 @@ public class CheckpointCoordinator {
 	 */
 	public void shutdown() {
 		synchronized (lock) {
-			if (shutdown) {
-				return;
-			}
-			shutdown = true;
-			LOG.info("Stopping checkpoint coordinator for job " + job);
-			
-			// shut down the thread that handles the timeouts
-			timer.cancel();
-			
-			// make sure that the actor does not linger
-			if (jobStatusListener != null) {
-				jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
-				jobStatusListener = null;
-			}
-			
-			// the scheduling thread needs also to go away
-			if (periodicScheduler != null) {
-				periodicScheduler.cancel();
-				periodicScheduler = null;
-			}
-			
-			// clear and discard all pending checkpoints
-			for (PendingCheckpoint pending : pendingCheckpoints.values()) {
+			try {
+				if (shutdown) {
+					return;
+				}
+				shutdown = true;
+				LOG.info("Stopping checkpoint coordinator for job " + job);
+
+				// shut down the thread that handles the timeouts
+				timer.cancel();
+
+				// make sure that the actor does not linger
+				if (jobStatusListener != null) {
+					jobStatusListener.tell(PoisonPill.getInstance(), ActorRef.noSender());
+					jobStatusListener = null;
+				}
+
+				// the scheduling thread needs also to go away
+				if (periodicScheduler != null) {
+					periodicScheduler.cancel();
+					periodicScheduler = null;
+				}
+
+				// clear and discard all pending checkpoints
+				for (PendingCheckpoint pending : pendingCheckpoints.values()) {
 					pending.discard(userClassLoader, true);
+				}
+				pendingCheckpoints.clear();
+
+				// clean and discard all successful checkpoints
+				for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
+					checkpoint.discard(userClassLoader);
+				}
+				completedCheckpoints.clear();
 			}
-			pendingCheckpoints.clear();
-			
-			// clean and discard all successful checkpoints
-			for (SuccessfulCheckpoint checkpoint : completedCheckpoints) {
-				checkpoint.discard(userClassLoader);
+			finally {
+				// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
+				// shutdown hook itself.
+				if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
+					try {
+						Runtime.getRuntime().removeShutdownHook(shutdownHook);
+					}
+					catch (IllegalStateException ignored) {
+						// race, JVM is in shutdown already, we can safely ignore this
+					}
+					catch (Throwable t) {
+						LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t);
+					}
+				}
 			}
-			completedCheckpoints.clear();
 		}
 	}