You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/02/08 17:40:53 UTC

[1/2] flink git commit: [FLINK-5153] Add test for YARN application tags

Repository: flink
Updated Branches:
  refs/heads/master e24a866bf -> 95765b6d8


[FLINK-5153] Add test for YARN application tags


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95765b6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95765b6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95765b6d

Branch: refs/heads/master
Commit: 95765b6d8fdda9c601ad61dc39ce02043ecefa05
Parents: d9c116e
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Tue Feb 7 12:47:21 2017 -0500
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     | 43 ++++++++++++++++----
 1 file changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95765b6d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index ec66eb2..2a3b6c6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Joiner;
+import com.google.common.collect.Sets;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -53,13 +54,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.util.EnumSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Arrays;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -413,11 +410,14 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			throw new RuntimeException(e);
 		}
 
-		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+		Runner runner = startWithArgs(new String[]{
+				"run", "-m", "yarn-cluster",
+				"-yj", flinkUberjar.getAbsolutePath(),
 				"-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
 				"-yjm", "768",
 				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
+				"-yD", "yarn.tags=test-tag",
 				"-ytm", "1024",
 				"-ys", "2", // test requesting slots from YARN.
 				"--yarndetached", job,
@@ -516,6 +516,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				LOG.info("Got report {}", rep);
 			} while(rep.getYarnApplicationState() == YarnApplicationState.RUNNING);
 
+			verifyApplicationTags(rep);
 		} catch(Throwable t) {
 			LOG.warn("Error while detached yarn session was running", t);
 			Assert.fail(t.getMessage());
@@ -543,6 +544,32 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		}
 	}
 
+	/**
+	 * Ensures that the YARN application tags were set properly.
+	 *
+	 * Since YARN application tags were only added in Hadoop 2.4, but Flink still supports Hadoop 2.3, reflection is
+	 * required to invoke the methods. If the method does not exist, this test passes.
+	 */
+	private void verifyApplicationTags(final ApplicationReport report) throws InvocationTargetException,
+		IllegalAccessException {
+
+		final Method applicationTagsMethod;
+
+		Class<ApplicationReport> clazz = ApplicationReport.class;
+		try {
+			// this method is only supported by Hadoop 2.4.0 onwards
+			applicationTagsMethod = clazz.getMethod("getApplicationTags");
+		} catch (NoSuchMethodException e) {
+			// only verify the tags if the method exists
+			return;
+		}
+
+		@SuppressWarnings("unchecked")
+		Set<String> applicationTags = (Set<String>) applicationTagsMethod.invoke(report);
+
+		Assert.assertEquals(applicationTags, Sets.newHashSet("test-tag"));
+	}
+
 	@After
 	public void checkForProhibitedLogContents() {
 		ensureNoProhibitedStringInLogFiles(PROHIBITED_STRINGS, WHITELISTED_STRINGS);


[2/2] flink git commit: [FLINK-5153] Support YARN application tags

Posted by rm...@apache.org.
[FLINK-5153] Support YARN application tags

Adds a new config option `yarn.tags`, a comma-separated list of strings
passed to YARN as application tags.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9c116e5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9c116e5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9c116e5

Branch: refs/heads/master
Commit: d9c116e542889b6ad00485529b97652dc2d59cad
Parents: e24a866
Author: Patrick Lucas <me...@patricklucas.com>
Authored: Fri Feb 3 19:17:55 2017 -0500
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Feb 8 18:39:47 2017 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   2 +
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../yarn/AbstractYarnClusterDescriptor.java     | 114 ++++++++++++++-----
 3 files changed, 93 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 2872cfa..2accdc2 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -431,6 +431,8 @@ use the `env.java.opts` setting, which is the `%jvmopts%` variable in the String
 
   For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
 
+- `yarn.tags` A comma-separated list of tags to apply to the Flink YARN application.
+
 ### Mesos
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 14ba9dd..c608fde 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -442,6 +442,11 @@ public final class ConfigConstants {
 	 */
 	public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port";
 
+	/**
+	 * A comma-separated list of strings to use as YARN application tags.
+	 */
+	public static final String YARN_APPLICATION_TAGS = "yarn.tags";
+
 
 	// ------------------------ Mesos Configuration ------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9c116e5/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b537e09..21599c1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -855,7 +855,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			name = customName;
 		}
 
-		appContext.setApplicationName(name); // application name
+		appContext.setApplicationName(name);
 		appContext.setApplicationType("Apache Flink");
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
@@ -863,6 +863,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			appContext.setQueue(yarnQueue);
 		}
 
+		setApplicationTags(appContext);
+
 		// add a hook to clean up in case deployment fails
 		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
 		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
@@ -1024,75 +1026,117 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		customName = name;
 	}
 
-	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws InvocationTargetException, IllegalAccessException {
+	private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
+		InvocationTargetException, IllegalAccessException {
+
 		ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
 
 		reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
 		reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis());
 	}
 
+	private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
+		IllegalAccessException {
+
+		final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
+		final String tagsString = flinkConfiguration.getString(ConfigConstants.YARN_APPLICATION_TAGS, "");
+
+		final Set<String> applicationTags = new HashSet<>();
+
+		// Trim whitespace and cull empty tags
+		for (final String tag : tagsString.split(",")) {
+			final String trimmedTag = tag.trim();
+			if (!trimmedTag.isEmpty()) {
+				applicationTags.add(trimmedTag);
+			}
+		}
+
+		reflector.setApplicationTags(appContext, applicationTags);
+	}
+
 	/**
 	 * Singleton object which uses reflection to determine whether the {@link ApplicationSubmissionContext}
-	 * supports the setKeepContainersAcrossApplicationAttempts and the setAttemptFailuresValidityInterval
-	 * methods. Depending on the Hadoop version these methods are supported or not. If the methods
-	 * are not supported, then nothing happens when setKeepContainersAcrossApplicationAttempts or
-	 * setAttemptFailuresValidityInterval are called.
+	 * supports various methods which, depending on the Hadoop version, may or may not be supported.
+	 *
+	 * If an unsupported method is invoked, nothing happens.
+	 *
+	 * Currently three methods are proxied:
+	 * - setApplicationTags (>= 2.4.0)
+	 * - setAttemptFailuresValidityInterval (>= 2.6.0)
+	 * - setKeepContainersAcrossApplicationAttempts (>= 2.4.0)
 	 */
 	private static class ApplicationSubmissionContextReflector {
 		private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
 
-		private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
+		private static final ApplicationSubmissionContextReflector instance =
+			new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
 
 		public static ApplicationSubmissionContextReflector getInstance() {
 			return instance;
 		}
 
-		private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
-		private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
+		private static final String APPLICATION_TAGS_METHOD_NAME = "setApplicationTags";
+		private static final String ATTEMPT_FAILURES_METHOD_NAME = "setAttemptFailuresValidityInterval";
+		private static final String KEEP_CONTAINERS_METHOD_NAME = "setKeepContainersAcrossApplicationAttempts";
 
-		private final Method keepContainersMethod;
+		private final Method applicationTagsMethod;
 		private final Method attemptFailuresValidityIntervalMethod;
+		private final Method keepContainersMethod;
 
 		private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> clazz) {
-			Method keepContainersMethod;
+			Method applicationTagsMethod;
 			Method attemptFailuresValidityIntervalMethod;
+			Method keepContainersMethod;
 
 			try {
 				// this method is only supported by Hadoop 2.4.0 onwards
-				keepContainersMethod = clazz.getMethod(keepContainersMethodName, boolean.class);
-				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+				applicationTagsMethod = clazz.getMethod(APPLICATION_TAGS_METHOD_NAME, Set.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
 			} catch (NoSuchMethodException e) {
-				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), keepContainersMethodName);
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), APPLICATION_TAGS_METHOD_NAME);
 				// assign null because the Hadoop version apparently does not support this call.
-				keepContainersMethod = null;
+				applicationTagsMethod = null;
 			}
 
-			this.keepContainersMethod = keepContainersMethod;
+			this.applicationTagsMethod = applicationTagsMethod;
 
 			try {
 				// this method is only supported by Hadoop 2.6.0 onwards
-				attemptFailuresValidityIntervalMethod = clazz.getMethod(attemptsFailuresValidityIntervalMethodName, long.class);
-				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+				attemptFailuresValidityIntervalMethod = clazz.getMethod(ATTEMPT_FAILURES_METHOD_NAME, long.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
 			} catch (NoSuchMethodException e) {
-				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), ATTEMPT_FAILURES_METHOD_NAME);
 				// assign null because the Hadoop version apparently does not support this call.
 				attemptFailuresValidityIntervalMethod = null;
 			}
 
 			this.attemptFailuresValidityIntervalMethod = attemptFailuresValidityIntervalMethod;
-		}
 
-		public void setKeepContainersAcrossApplicationAttempts(
-				ApplicationSubmissionContext appContext,
-				boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+			try {
+				// this method is only supported by Hadoop 2.4.0 onwards
+				keepContainersMethod = clazz.getMethod(KEEP_CONTAINERS_METHOD_NAME, boolean.class);
+				LOG.debug("{} supports method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+			} catch (NoSuchMethodException e) {
+				LOG.debug("{} does not support method {}.", clazz.getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
+				// assign null because the Hadoop version apparently does not support this call.
+				keepContainersMethod = null;
+			}
 
-			if (keepContainersMethod != null) {
-				LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+			this.keepContainersMethod = keepContainersMethod;
+		}
+
+		public void setApplicationTags(
+			ApplicationSubmissionContext appContext,
+			Set<String> applicationTags) throws InvocationTargetException, IllegalAccessException {
+			if (applicationTagsMethod != null) {
+				LOG.debug("Calling method {} of {}.",
+					applicationTagsMethod.getName(),
 					appContext.getClass().getCanonicalName());
-				keepContainersMethod.invoke(appContext, keepContainers);
+				applicationTagsMethod.invoke(appContext, applicationTags);
 			} else {
 				LOG.debug("{} does not support method {}. Doing nothing.",
-					appContext.getClass().getCanonicalName(), keepContainersMethodName);
+					appContext.getClass().getCanonicalName(),
+					APPLICATION_TAGS_METHOD_NAME);
 			}
 		}
 
@@ -1107,7 +1151,21 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			} else {
 				LOG.debug("{} does not support method {}. Doing nothing.",
 					appContext.getClass().getCanonicalName(),
-					attemptsFailuresValidityIntervalMethodName);
+					ATTEMPT_FAILURES_METHOD_NAME);
+			}
+		}
+
+		public void setKeepContainersAcrossApplicationAttempts(
+			ApplicationSubmissionContext appContext,
+			boolean keepContainers) throws InvocationTargetException, IllegalAccessException {
+
+			if (keepContainersMethod != null) {
+				LOG.debug("Calling method {} of {}.", keepContainersMethod.getName(),
+					appContext.getClass().getCanonicalName());
+				keepContainersMethod.invoke(appContext, keepContainers);
+			} else {
+				LOG.debug("{} does not support method {}. Doing nothing.",
+					appContext.getClass().getCanonicalName(), KEEP_CONTAINERS_METHOD_NAME);
 			}
 		}
 	}