You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/02/08 17:40:54 UTC
[2/2] flink git commit: [FLINK-5153] Support YARN application tags
[FLINK-5153] Support YARN application tags
Adds a new config option `yarn.tags`, a comma-separated list of strings
passed to YARN as application tags.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c116e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c116e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c116e5
Branch: refs/heads/master
Commit: d9c116e542889b6ad00485529b97652dc2d59cad
Parents: e24a866
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Feb 3 19:17:55 2017 -0500
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100
----------------------------------------------------------------------
docs/setup/config.md | 2 +
.../flink/configuration/ConfigConstants.java | 5 +
.../yarn/AbstractYarnClusterDescriptor.java | 114 ++++++++++++++-----
3 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2872cfa..2accdc2 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String
For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
+- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application.
+
### Mesos
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 14ba9dd..c608fde 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -442,6 +442,11 @@ public final class ConfigConstants {
*/
public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
+ /**
+ * A comma-separated list of strings to use as YARN application tags.
+ */
+ public static final String YARN_APPLICATION_TAGS = "yarn.tags";
+
// ------------------------ Mesos Configuration ------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b537e09..21599c1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -855,7 +855,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
name = customName;
}
- appContext.setApplicationName(name); // application name
+ appContext.setApplicationName(name);
appContext.setApplicationType("Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
@@ -863,6 +863,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appContext.setQueue(yarnQueue);
}
+ setApplicationTags(appContext);
+
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
@@ -1024,75 +1026,117 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
customName = name;
}
- private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
+ private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
+ InvocationTargetException, IllegalAccessException {
+
ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
}
+ private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
+ IllegalAccessException {
+
+ final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
+ final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+
+ final Set<String> applicationTags = new HashSet<>();
+
+ // Trim whitespace and cull empty tags
+ for (final String tag : tagsString.split(",")) {
+ final String trimmedTag = tag.trim();
+ if (!trimmedTag.isEmpty()) {
+ applicationTags.add(trimmedTag);
+ }
+ }
+
+ reflector.setApplicationTags(appContext, applicationTags);
+ }
+
/**
* Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
- * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
- * methods. Depending on the Hadoop version these methods are supported or not. If the methods
- * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
- * setAttemptFailuresValidityInterval are called.
+ * supports various methods which, depending on the Hadoop version, may or may not be supported.
+ *
+ * If an unsupported method is invoked, nothing happens.
+ *
+ * Currently three methods are proxied:
+ * - setApplicationTags (>= 2.4.0)
+ * - setAttemptFailuresValidityInterval (>= 2.6.0)
+ * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
*/
private static class ApplicationSubmissionContextReflector {
private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
- private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+ private static final ApplicationSubmissionContextReflector instance =
+ new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
public static ApplicationSubmissionContextReflector getInstance() {
return instance;
}
- private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
- private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
+ private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
+ private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
+ private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
- private final Method keepContainersMethod;
+ private final Method applicationTagsMethod;
private final Method attemptFailuresValidityIntervalMethod;
+ private final Method keepContainersMethod;
private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
- Method keepContainersMethod;
+ Method applicationTagsMethod;
Method attemptFailuresValidityIntervalMethod;
+ Method keepContainersMethod;
try {
// this method is only supported by Hadoop 2.4.0 onwards
- keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
- LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+ applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
+ LOG.debug("{} supports method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
} catch (NoSuchMethodException e) {
- LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+ LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
// assign null because the Hadoop version apparently does not support this call.
- keepContainersMethod = null;
+ applicationTagsMethod = null;
}
- this.keepContainersMethod = keepContainersMethod;
+ this.applicationTagsMethod = applicationTagsMethod;
try {
// this method is only supported by Hadoop 2.6.0 onwards
- attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
- LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+ attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
+ LOG.debug("{} supports method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
} catch (NoSuchMethodException e) {
- LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+ LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
// assign null because the Hadoop version apparently does not support this call.
attemptFailuresValidityIntervalMethod = null;
}
this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
- }
- public void setKeepContainersAcrossApplicationAttempts(
- ApplicationSubmissionContext appContext,
- boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+ try {
+ // this method is only supported by Hadoop 2.4.0 onwards
+ keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
+ LOG.debug("{} supports method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+ } catch (NoSuchMethodException e) {
+ LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+ // assign null because the Hadoop version apparently does not support this call.
+ keepContainersMethod = null;
+ }
- if (keepContainersMethod != null) {
- LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+ this.keepContainersMethod = keepContainersMethod;
+ }
+
+ public void setApplicationTags(
+ ApplicationSubmissionContext appContext,
+ Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
+ if (applicationTagsMethod != null) {
+ LOG.debug("Calling method {} of {}.",
+ applicationTagsMethod.getName(),
appContext.getClass().getCanonicalName());
- keepContainersMethod.invoke(appContext, keepContainers);
+ applicationTagsMethod.invoke(appContext, applicationTags);
} else {
LOG.debug("{} does not support method {}. Doing nothing.",
- appContext.getClass().getCanonicalName(), keepContainersMethodName);
+ appContext.getClass().getCanonicalName(),
+ APPLICATION_TAGS_METHOD_NAME);
}
}
@@ -1107,7 +1151,21 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
} else {
LOG.debug("{} does not support method {}. Doing nothing.",
appContext.getClass().getCanonicalName(),
- attemptsFailuresValidityIntervalMethodName);
+ ATTEMPT_FAILURES_METHOD_NAME);
+ }
+ }
+
+ public void setKeepContainersAcrossApplicationAttempts(
+ ApplicationSubmissionContext appContext,
+ boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+
+ if (keepContainersMethod != null) {
+ LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+ appContext.getClass().getCanonicalName());
+ keepContainersMethod.invoke(appContext, keepContainers);
+ } else {
+ LOG.debug("{} does not support method {}. Doing nothing.",
+ appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
}
}
}