You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/29 11:24:23 UTC
incubator-flink git commit: [FLINK-1154] Quickfix to kill
TaskManagers in YARN mode.
Repository: incubator-flink
Updated Branches:
refs/heads/master 84d6da8fd -> e46d14b4c
[FLINK-1154] Quickfix to kill TaskManagers in YARN mode.
This closes #233
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e46d14b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e46d14b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e46d14b4
Branch: refs/heads/master
Commit: e46d14b4c7507b23f2c80e0d893cfff6f6c49bae
Parents: 84d6da8
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Nov 13 15:00:01 2014 +0100
Committer: Robert Metzger <me...@web.de>
Committed: Sat Nov 29 11:23:21 2014 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/yarn/Client.java | 5 ++--
.../main/java/org/apache/flink/yarn/Utils.java | 2 +-
.../flink/yarn/appMaster/ApplicationMaster.java | 1 +
.../src/main/flink-bin/yarn-bin/yarn-session.sh | 2 +-
.../apache/flink/runtime/instance/Instance.java | 25 ++++++++++++++++++++
.../flink/runtime/instance/InstanceManager.java | 10 ++++++++
.../protocols/TaskOperationProtocol.java | 2 +-
.../flink/runtime/taskmanager/TaskManager.java | 6 +++++
8 files changed, 48 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 92c69b6..161aa8a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -343,7 +343,9 @@ public class Client {
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(conf);
- if(fs.getScheme().startsWith("file")) {
+ // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
+ if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
+ fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
@@ -628,7 +630,6 @@ public class Client {
+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
+ "(It sometimes takes a few seconds until the logs are aggregated)");
}
-
}
private void printHelp() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 42a4592..6b541bf 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -212,7 +212,7 @@ public class Utils {
localResource.setSize(jarStat.getLen());
localResource.setTimestamp(jarStat.getModificationTime());
localResource.setType(LocalResourceType.FILE);
- localResource.setVisibility(LocalResourceVisibility.PUBLIC);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
}
public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
index 3264918..a285381 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -521,6 +521,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
private void close() throws Exception {
if(!isClosed) {
+ jobManager.getInstanceManager().killTaskManagers();
jobManager.shutdown();
nmClient.close();
rmClient.close();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 9ff4ed3..21da505 100644
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
export FLINK_CONF_DIR
-$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
+$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH $log_setting org.apache.flink.yarn.Client -ship $bin/../ship/ -confDir $FLINK_CONF_DIR -j $FLINK_LIB_DIR/*yarn-uberjar.jar $*
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index ff9a87d..89453bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -130,6 +130,31 @@ public class Instance {
return !isDead;
}
+ public void stopInstance() {
+ try {
+ final TaskOperationProtocol tmProxy = this.getTaskManagerProxy();
+ // start a thread for stopping the TM to avoid infinitive blocking.
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ tmProxy.killTaskManager();
+ } catch (IOException e) {
+ if (Log.isDebugEnabled()) {
+ Log.debug("Error while stopping TaskManager", e);
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.setDaemon(true); // do not prevent the JVM from stopping
+ t.start();
+ } catch (Exception e) {
+ if (Log.isDebugEnabled()) {
+ Log.debug("Error while stopping TaskManager", e);
+ }
+ }
+ }
public void markDead() {
if (isDead) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index ced1afe..f779048 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -102,6 +102,16 @@ public class InstanceManager {
public long getHeartbeatTimeout() {
return heartbeatTimeout;
}
+
+ /**
+ * This method is only used by the Flink YARN client to self-destruct a Flink cluster
+ * by stopping the JVMs of the TaskManagers.
+ */
+ public void killTaskManagers() {
+ for (Instance i : this.registeredHostsById.values()) {
+ i.stopInstance();
+ }
+ }
public void shutdown() {
synchronized (this.lock) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
index 7e39047..d2ad119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/TaskOperationProtocol.java
@@ -36,5 +36,5 @@ public interface TaskOperationProtocol extends VersionedProtocol {
TaskOperationResult cancelTask(ExecutionAttemptID executionId) throws IOException;
-
+ void killTaskManager() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e46d14b4/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 58529b8..cfb9213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -1297,4 +1297,10 @@ public class TaskManager implements TaskOperationProtocol {
return timeout;
}
}
+
+ @Override
+ public void killTaskManager() throws IOException {
+ LOG.info("Killing TaskManager");
+ System.exit(0); // returning 0 because the TM is not stopping in an error condition.
+ }
}