You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/02/08 17:40:54 UTC

[2/2] flink git commit: [FLINK-5153] Support YARN application tags

[FLINK-5153] Support YARN application tags

Adds a new config option `yarn.tags`, a comma-separated list of strings
passed to YARN as application tags.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c116e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c116e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c116e5

Branch: refs/heads/master
Commit: d9c116e542889b6ad00485529b97652dc2d59cad
Parents: e24a866
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Feb 3 19:17:55 2017 -0500
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 114 ++++++++++++++-----
 3 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2872cfa..2accdc2 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String
 
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
 
+- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application.
+
 ### Mesos
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 14ba9dd..c608fde 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -442,6 +442,11 @@ public final class ConfigConstants {
 	 */
 	public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
 
+	/**
+	 * A comma-separated list of strings to use as YARN application tags.
+	 */
+	public static final String YARN_APPLICATION_TAGS = "yarn.tags";
+
 
 	// ------------------------ Mesos Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b537e09..21599c1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -855,7 +855,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			name = customName;
 		}
 
-		appContext.setApplicationName(name); // application name
+		appContext.setApplicationName(name);
 		appContext.setApplicationType("Apache Flink");
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
@@ -863,6 +863,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			appContext.setQueue(yarnQueue);
 		}
 
+		setApplicationTags(appContext);
+
 		// add a hook to clean up in case deployment fails
 		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
 		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
@@ -1024,75 +1026,117 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		customName = name;
 	}
 
-	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
+	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
+		InvocationTargetException, IllegalAccessException {
+
 		ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
 
 		reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
 		reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
 	}
 
+	private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
+		IllegalAccessException {
+
+		final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
+		final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+
+		final Set<String> applicationTags = new HashSet<>();
+
+		// Trim whitespace and cull empty tags
+		for (final String tag : tagsString.split(",")) {
+			final String trimmedTag = tag.trim();
+			if (!trimmedTag.isEmpty()) {
+				applicationTags.add(trimmedTag);
+			}
+		}
+
+		reflector.setApplicationTags(appContext, applicationTags);
+	}
+
 	/**
 	 * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
-	 * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
-	 * methods. Depending on the Hadoop version these methods are supported or not. If the methods
-	 * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
-	 * setAttemptFailuresValidityInterval are called.
+	 * supports various methods which, depending on the Hadoop version, may or may not be supported.
+	 *
+	 * If an unsupported method is invoked, nothing happens.
+	 *
+	 * Currently three methods are proxied:
+	 * - setApplicationTags (>= 2.4.0)
+	 * - setAttemptFailuresValidityInterval (>= 2.6.0)
+	 * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
 	 */
 	private static class ApplicationSubmissionContextReflector {
 		private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
 
-		private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+		private static final ApplicationSubmissionContextReflector instance =
+			new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
 
 		public static ApplicationSubmissionContextReflector getInstance() {
 			return instance;
 		}
 
-		private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
-		private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
+		private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
+		private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
+		private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
 
-		private final Method keepContainersMethod;
+		private final Method applicationTagsMethod;
 		private final Method attemptFailuresValidityIntervalMethod;
+		private final Method keepContainersMethod;
 
 		private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
-			Method keepContainersMethod;
+			Method applicationTagsMethod;
 			Method attemptFailuresValidityIntervalMethod;
+			Method keepContainersMethod;
 
 			try {
 				// this method is only supported by Hadoop 2.4.0 onwards
-				keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
-				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+				applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
 			} catch (NoSuchMethodException e) {
-				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
 				// assign null because the Hadoop version apparently does not support this call.
-				keepContainersMethod = null;
+				applicationTagsMethod = null;
 			}
 
-			this.keepContainersMethod = keepContainersMethod;
+			this.applicationTagsMethod = applicationTagsMethod;
 
 			try {
 				// this method is only supported by Hadoop 2.6.0 onwards
-				attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
-				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+				attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
 			} catch (NoSuchMethodException e) {
-				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
 				// assign null because the Hadoop version apparently does not support this call.
 				attemptFailuresValidityIntervalMethod = null;
 			}
 
 			this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
-		}
 
-		public void setKeepContainersAcrossApplicationAttempts(
-				ApplicationSubmissionContext appContext,
-				boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+			try {
+				// this method is only supported by Hadoop 2.4.0 onwards
+				keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+			} catch (NoSuchMethodException e) {
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+				// assign null because the Hadoop version apparently does not support this call.
+				keepContainersMethod = null;
+			}
 
-			if (keepContainersMethod != null) {
-				LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+			this.keepContainersMethod = keepContainersMethod;
+		}
+
+		public void setApplicationTags(
+			ApplicationSubmissionContext appContext,
+			Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
+			if (applicationTagsMethod != null) {
+				LOG.debug("Calling method {} of {}.",
+					applicationTagsMethod.getName(),
 					appContext.getClass().getCanonicalName());
-				keepContainersMethod.invoke(appContext, keepContainers);
+				applicationTagsMethod.invoke(appContext, applicationTags);
 			} else {
 				LOG.debug("{} does not support method {}. Doing nothing.",
-					appContext.getClass().getCanonicalName(), keepContainersMethodName);
+					appContext.getClass().getCanonicalName(),
+					APPLICATION_TAGS_METHOD_NAME);
 			}
 		}
 
@@ -1107,7 +1151,21 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			} else {
 				LOG.debug("{} does not support method {}. Doing nothing.",
 					appContext.getClass().getCanonicalName(),
-					attemptsFailuresValidityIntervalMethodName);
+					ATTEMPT_FAILURES_METHOD_NAME);
+			}
+		}
+
+		public void setKeepContainersAcrossApplicationAttempts(
+			ApplicationSubmissionContext appContext,
+			boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+
+			if (keepContainersMethod != null) {
+				LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+					appContext.getClass().getCanonicalName());
+				keepContainersMethod.invoke(appContext, keepContainers);
+			} else {
+				LOG.debug("{} does not support method {}. Doing nothing.",
+					appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
 			}
 		}
 	}