You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/29 11:24:23 UTC

incubator-flink git commit: [FLINK-1154] Quickfix to kill TaskManagers in YARN mode.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 84d6da8fd -> e46d14b4c


[FLINK-1154] Quickfix to kill TaskManagers in YARN mode.

This closes #233


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e46d14b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e46d14b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e46d14b4

Branch: refs/heads/master
Commit: e46d14b4c7507b23f2c80e0d893cfff6f6c49bae
Parents: 84d6da8
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 13 15:00:01 2014 +0100
Committer: Robert Metzger <me...@web.de>
Committed: Sat Nov 29 11:23:21 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Client.java |  5 ++--
 .../main/java/org/apache/flink/yarn/Utils.java  |  2 +-
 .../flink/yarn/appMaster/ApplicationMaster.java |  1 +
 .../src/main/flink-bin/yarn-bin/yarn-session.sh |  2 +-
 .../apache/flink/runtime/instance/Instance.java | 25 ++++++++++++++++++++
 .../flink/runtime/instance/InstanceManager.java | 10 ++++++++
 .../protocols/TaskOperationProtocol.java        |  2 +-
 .../flink/runtime/taskmanager/TaskManager.java  |  6 +++++
 8 files changed, 48 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 92c69b6..161aa8a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -343,7 +343,9 @@ public class Client {
 		// Create a local resource to point to the destination jar path
 		final FileSystem fs = FileSystem.get(conf);
 
-		if(fs.getScheme().startsWith("file")) {
+		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+		if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+				fs.getScheme().startsWith("file")) {
 			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
 					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
 					+ "The Flink YARN client needs to store its files in a distributed file system");
@@ -628,7 +630,6 @@ public class Client {
 					+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
 					+ "(It sometimes takes a few seconds until the logs are aggregated)");
 		}
-
 	}
 
 	private void printHelp() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 42a4592..6b541bf 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -212,7 +212,7 @@ public class Utils {
 		localResource.setSize(jarStat.getLen());
 		localResource.setTimestamp(jarStat.getModificationTime());
 		localResource.setType(LocalResourceType.FILE);
-		localResource.setVisibility(LocalResourceVisibility.PUBLIC);
+		localResource.setVisibility(LocalResourceVisibility.APPLICATION);
 	}
 
 	public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
index 3264918..a285381 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -521,6 +521,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	
 	private void close() throws Exception {
 		if(!isClosed) {
+			jobManager.getInstanceManager().killTaskManagers();
 			jobManager.shutdown();
 			nmClient.close();
 			rmClient.close();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 9ff4ed3..21da505 100644
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
+$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index ff9a87d..89453bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -130,6 +130,31 @@ public class Instance {
 		return !isDead;
 	}
 	
+	public void stopInstance() {
+		try {
+			final TaskOperationProtocol tmProxy = this.getTaskManagerProxy();
+			// start a thread for stopping the TM to avoid infinitive blocking.
+			Runnable r = new Runnable() {
+				@Override
+				public void run() {
+					try {
+						tmProxy.killTaskManager();
+					} catch (IOException e) {
+						if (Log.isDebugEnabled()) {
+							Log.debug("Error while stopping TaskManager", e);
+						}
+					}
+				}
+			};
+			Thread t = new Thread(r);
+			t.setDaemon(true); // do not prevent the JVM from stopping
+			t.start();
+		} catch (Exception e) {
+			if (Log.isDebugEnabled()) {
+				Log.debug("Error while stopping TaskManager", e);
+			}
+		}
+	}
 	public void markDead() {
 		if (isDead) {
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index ced1afe..f779048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -102,6 +102,16 @@ public class InstanceManager {
 	public long getHeartbeatTimeout() {
 		return heartbeatTimeout;
 	}
+	
+	/**
+	 * This method is only used by the Flink YARN client to self-destruct a Flink cluster
+	 * by stopping the JVMs of the TaskManagers.
+	 */
+	public void killTaskManagers() {
+		for (Instance i : this.registeredHostsById.values()) {
+			i.stopInstance();
+		}
+	}
 
 	public void shutdown() {
 		synchronized (this.lock) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
index 7e39047..d2ad119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
@@ -36,5 +36,5 @@ public interface TaskOperationProtocol extends VersionedProtocol {
 
 	TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;
 
-	
+	void killTaskManager() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 58529b8..cfb9213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -1297,4 +1297,10 @@ public class TaskManager implements TaskOperationProtocol {
 			return timeout;
 		}
 	}
+
+	@Override
+	public void killTaskManager() throws IOException {
+		LOG.info("Killing TaskManager");
+		System.exit(0); // returning 0 because the TM is not stopping in an error condition.
+	}
 }