You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/13 19:53:49 UTC
[1/2] flink git commit: [FLINK-5631] [yarn] Support downloading
additional jars from non-HDFS paths.
Repository: flink
Updated Branches:
refs/heads/master 6bc6b225e -> 186b12309
[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.
This closes #3202
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/186b1230
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/186b1230
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/186b1230
Branch: refs/heads/master
Commit: 186b12309b540f82a055be28f3f005dce4b8cf46
Parents: 30c5b77
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jan 31 12:11:01 2017 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 13 20:51:50 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/yarn/Utils.java | 223 +++++++++++++-
.../flink/yarn/YarnApplicationMasterRunner.java | 236 +--------------
.../apache/flink/yarn/YarnResourceManager.java | 211 +------------
.../java/org/apache/flink/yarn/UtilsTest.java | 298 +++++++++++++++++++
.../yarn/YarnApplicationMasterRunnerTest.java | 93 ++++++
.../yarn/YarnFlinkResourceManagerTest.java | 298 -------------------
6 files changed, 617 insertions(+), 742 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 94d4582..60f7204 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -23,11 +23,16 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
/**
* Utility class that provides helper methods to work with Apache Hadoop YARN.
*/
@@ -107,7 +114,7 @@ public final class Utils {
addToEnvironment(
appMasterEnv,
Environment.CLASSPATH.name(),
- appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+ appMasterEnv.get(ENV_FLINK_CLASSPATH));
String[] applicationClassPathEntries = conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
@@ -264,4 +271,218 @@ public final class Utils {
}
return result;
}
+
+ /**
+ * Creates the launch context, which describes how to bring up a TaskExecutor / TaskManager process in
+ * an allocated YARN container.
+ *
+ * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
+ * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+ * container launch context. The launch context then ensures that those resources will be
+ * copied into the containers transient working directory.
+ *
+ * @param flinkConfig
+ * The Flink configuration object.
+ * @param yarnConfig
+ * The YARN configuration object.
+ * @param env
+ * The environment variables.
+ * @param tmParams
+ * The TaskExecutor container memory parameters.
+ * @param taskManagerConfig
+ * The configuration for the TaskExecutors.
+ * @param workingDirectory
+ * The current application master container's working directory.
+ * @param taskManagerMainClass
+ * The class with the main method.
+ * @param log
+ * The logger.
+ *
+ * @return The launch context for the TaskManager processes.
+ *
+ * @throws Exception Thrown if teh launch context could not be created, for example if
+ * the resources could not be copied.
+ */
+ static ContainerLaunchContext createTaskExecutorContext(
+ org.apache.flink.configuration.Configuration flinkConfig,
+ YarnConfiguration yarnConfig,
+ Map<String, String> env,
+ ContaineredTaskManagerParameters tmParams,
+ org.apache.flink.configuration.Configuration taskManagerConfig,
+ String workingDirectory,
+ Class<?> taskManagerMainClass,
+ Logger log) throws Exception {
+
+ // get and validate all relevant variables
+
+ String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+ require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
+
+ String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+ require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
+
+ String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+ require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+ String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+ require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+ String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+ log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+ log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
+
+ final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+ log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+ String classPathString = env.get(ENV_FLINK_CLASSPATH);
+ require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+ //register keytab
+ LocalResource keytabResource = null;
+ if(remoteKeytabPath != null) {
+ log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+ keytabResource = Records.newRecord(LocalResource.class);
+ Path keytabPath = new Path(remoteKeytabPath);
+ FileSystem fs = keytabPath.getFileSystem(yarnConfig);
+ registerLocalResource(fs, keytabPath, keytabResource);
+ }
+
+ //To support Yarn Secure Integration Test Scenario
+ LocalResource yarnConfResource = null;
+ LocalResource krb5ConfResource = null;
+ boolean hasKrb5 = false;
+ if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+ log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+ yarnConfResource = Records.newRecord(LocalResource.class);
+ Path yarnConfPath = new Path(remoteYarnConfPath);
+ FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
+ registerLocalResource(fs, yarnConfPath, yarnConfResource);
+
+ log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+ krb5ConfResource = Records.newRecord(LocalResource.class);
+ Path krb5ConfPath = new Path(remoteKrb5Path);
+ fs = krb5ConfPath.getFileSystem(yarnConfig);
+ registerLocalResource(fs, krb5ConfPath, krb5ConfResource);
+
+ hasKrb5 = true;
+ }
+
+ // register Flink Jar with remote HDFS
+ LocalResource flinkJar = Records.newRecord(LocalResource.class);
+ {
+ Path remoteJarPath = new Path(remoteFlinkJarPath);
+ FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
+ registerLocalResource(fs, remoteJarPath, flinkJar);
+ }
+
+ // register conf with local fs
+ LocalResource flinkConf = Records.newRecord(LocalResource.class);
+ {
+ // write the TaskManager configuration to a local file
+ final File taskManagerConfigFile =
+ new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+ log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+ BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+ Path homeDirPath = new Path(clientHomeDir);
+ FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
+ setupLocalResource(fs, appId,
+ new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+ log.info("Prepared local resource for modified yaml: {}", flinkConf);
+ }
+
+ Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+ taskManagerLocalResources.put("flink.jar", flinkJar);
+ taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+ //To support Yarn Secure Integration Test Scenario
+ if(yarnConfResource != null && krb5ConfResource != null) {
+ taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
+ taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
+ }
+
+ if(keytabResource != null) {
+ taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
+ }
+
+ // prepare additional files to be shipped
+ for (String pathStr : shipListString.split(",")) {
+ if (!pathStr.isEmpty()) {
+ LocalResource resource = Records.newRecord(LocalResource.class);
+ Path path = new Path(pathStr);
+ registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
+ taskManagerLocalResources.put(path.getName(), resource);
+ }
+ }
+
+ // now that all resources are prepared, we can create the launch context
+
+ log.info("Creating container launch context for TaskManagers");
+
+ boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+ boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+ String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+ flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
+
+ log.info("Starting TaskManagers with command: " + launchCommand);
+
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ ctx.setCommands(Collections.singletonList(launchCommand));
+ ctx.setLocalResources(taskManagerLocalResources);
+
+ Map<String, String> containerEnv = new HashMap<>();
+ containerEnv.putAll(tmParams.taskManagerEnv());
+
+ // add YARN classpath, etc to the container environment
+ containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+ setupYarnClassPath(yarnConfig, containerEnv);
+
+ containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+ if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+ containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+ containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+ }
+
+ ctx.setEnvironment(containerEnv);
+
+ try (DataOutputBuffer dob = new DataOutputBuffer()) {
+ log.debug("Adding security tokens to Task Executor Container launch Context....");
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+ Credentials credentials = user.getCredentials();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ ctx.setTokens(securityTokens);
+ }
+ catch (Throwable t) {
+ log.error("Getting current user info failed when trying to launch the container", t);
+ }
+
+ return ctx;
+ }
+
+ /**
+ * Validates a condition, throwing a RuntimeException if the condition is violated.
+ *
+ * @param condition The condition.
+ * @param message The message for the runtime exception, with format variables as defined by
+ * {@link String#format(String, Object...)}.
+ * @param values The format arguments.
+ */
+ static void require(boolean condition, String message, Object... values) {
+ if (!condition) {
+ throw new RuntimeException(String.format(message, values));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 5cc51e4..9d5673c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -47,16 +47,10 @@ import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,19 +60,14 @@ import scala.Some;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.Utils.require;
/**
* This class is the executable entry point for the YARN application master.
@@ -329,7 +318,7 @@ public class YarnApplicationMasterRunner {
config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
LOG.debug("TaskManager configuration: {}", taskManagerConfig);
- final ContainerLaunchContext taskManagerContext = createTaskManagerContext(
+ final ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext(
config, yarnConfig, ENV,
taskManagerParameters, taskManagerConfig,
currDir, getTaskManagerClass(), LOG);
@@ -483,20 +472,6 @@ public class YarnApplicationMasterRunner {
// ------------------------------------------------------------------------
/**
- * Validates a condition, throwing a RuntimeException if the condition is violated.
- *
- * @param condition The condition.
- * @param message The message for the runtime exception, with format variables as defined by
- * {@link String#format(String, Object...)}.
- * @param values The format arguments.
- */
- private static void require(boolean condition, String message, Object... values) {
- if (!condition) {
- throw new RuntimeException(String.format(message, values));
- }
- }
-
- /**
*
* @param baseDirectory
* @param additional
@@ -549,211 +524,4 @@ public class YarnApplicationMasterRunner {
return configuration;
}
-
- /**
- * Creates the launch context, which describes how to bring up a TaskManager process in
- * an allocated YARN container.
- *
- * <p>This code is extremely YARN specific and registers all the resources that the TaskManager
- * needs (such as JAR file, config file, ...) and all environment variables in a YARN
- * container launch context. The launch context then ensures that those resources will be
- * copied into the containers transient working directory.
- *
- * <p>We do this work before we start the ResourceManager actor in order to fail early if
- * any of the operations here fail.
- *
- * @param flinkConfig
- * The Flink configuration object.
- * @param yarnConfig
- * The YARN configuration object.
- * @param env
- * The environment variables.
- * @param tmParams
- * The TaskManager container memory parameters.
- * @param taskManagerConfig
- * The configuration for the TaskManagers.
- * @param workingDirectory
- * The current application master container's working directory.
- * @param taskManagerMainClass
- * The class with the main method.
- * @param log
- * The logger.
- *
- * @return The launch context for the TaskManager processes.
- *
- * @throws Exception Thrown if teh launch context could not be created, for example if
- * the resources could not be copied.
- */
- public static ContainerLaunchContext createTaskManagerContext(
- Configuration flinkConfig,
- YarnConfiguration yarnConfig,
- Map<String, String> env,
- ContaineredTaskManagerParameters tmParams,
- Configuration taskManagerConfig,
- String workingDirectory,
- Class<?> taskManagerMainClass,
- Logger log) throws Exception {
-
- log.info("Setting up resources for TaskManagers");
-
- // get and validate all relevant variables
-
- String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
- require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
-
- String appId = env.get(YarnConfigKeys.ENV_APP_ID);
- require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
-
- String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
- require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
- String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
- require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
- String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
- require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
- final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
- LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
-
- final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
- LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
-
- final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
- LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath);
-
- final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
- LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
-
- String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
- require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
- // obtain a handle to the file system used by YARN
- final org.apache.hadoop.fs.FileSystem yarnFileSystem;
- try {
- yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
- } catch (IOException e) {
- throw new Exception("Could not access YARN's default file system", e);
- }
-
- //register keytab
- LocalResource keytabResource = null;
- if(remoteKeytabPath != null) {
- LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
- keytabResource = Records.newRecord(LocalResource.class);
- Path keytabPath = new Path(remoteKeytabPath);
- Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
- }
-
- //To support Yarn Secure Integration Test Scenario
- LocalResource yarnConfResource = null;
- LocalResource krb5ConfResource = null;
- boolean hasKrb5 = false;
- if(remoteYarnConfPath != null && remoteKrb5Path != null) {
- LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
- yarnConfResource = Records.newRecord(LocalResource.class);
- Path yarnConfPath = new Path(remoteYarnConfPath);
- Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
-
- LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
- krb5ConfResource = Records.newRecord(LocalResource.class);
- Path krb5ConfPath = new Path(remoteKrb5Path);
- Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
-
- hasKrb5 = true;
- }
-
- // register Flink Jar with remote HDFS
- LocalResource flinkJar = Records.newRecord(LocalResource.class);
- {
- Path remoteJarPath = new Path(remoteFlinkJarPath);
- Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
- }
-
- // register conf with local fs
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- {
- // write the TaskManager configuration to a local file
- final File taskManagerConfigFile =
- new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
- LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
- BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-
- Utils.setupLocalResource(yarnFileSystem, appId,
- new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
-
- log.info("Prepared local resource for modified yaml: {}", flinkConf);
- }
-
- Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
- taskManagerLocalResources.put("flink.jar", flinkJar);
- taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
- //To support Yarn Secure Integration Test Scenario
- if(yarnConfResource != null && krb5ConfResource != null) {
- taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
- taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
- }
-
- if(keytabResource != null) {
- taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
- }
-
- // prepare additional files to be shipped
- for (String pathStr : shipListString.split(",")) {
- if (!pathStr.isEmpty()) {
- LocalResource resource = Records.newRecord(LocalResource.class);
- Path path = new Path(pathStr);
- Utils.registerLocalResource(yarnFileSystem, path, resource);
- taskManagerLocalResources.put(path.getName(), resource);
- }
- }
-
- // now that all resources are prepared, we can create the launch context
-
- log.info("Creating container launch context for TaskManagers");
-
- boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
- boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-
- String launchCommand = BootstrapTools.getTaskManagerShellCommand(
- flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
- hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-
- log.info("Starting TaskManagers with command: " + launchCommand);
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- ctx.setCommands(Collections.singletonList(launchCommand));
- ctx.setLocalResources(taskManagerLocalResources);
-
- Map<String, String> containerEnv = new HashMap<>();
- containerEnv.putAll(tmParams.taskManagerEnv());
-
- // add YARN classpath, etc to the container environment
- containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
- Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
- containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
-
- if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
- containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
- containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
- }
-
- ctx.setEnvironment(containerEnv);
-
- try (DataOutputBuffer dob = new DataOutputBuffer()) {
- LOG.debug("Adding security tokens to Task Manager Container launch Context....");
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
- Credentials credentials = user.getCredentials();
- credentials.writeTokenStorageToStream(dob);
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- ctx.setTokens(securityTokens);
- }
- catch (Throwable t) {
- log.error("Getting current user info failed when trying to launch the container", t);
- }
-
- return ctx;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 9b9ea39..ab96441 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -35,10 +35,6 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -47,29 +43,20 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import org.apache.flink.util.ExceptionUtils;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
-import java.util.Collections;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
-
/**
* The yarn implementation of the resource manager. Used when the system is started
* via the resource framework YARN.
@@ -357,7 +344,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
flinkConfig, "", 0, 1, teRegistrationTimeout);
LOG.debug("TaskManager configuration: {}", taskManagerConfig);
- ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
+ ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig, yarnConfig, ENV,
taskManagerParameters, taskManagerConfig,
currDir, YarnTaskExecutorRunner.class, LOG);
@@ -371,204 +358,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
}
- /**
- * Creates the launch context, which describes how to bring up a TaskExecutor process in
- * an allocated YARN container.
- *
- * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
- * needs (such as JAR file, config file, ...) and all environment variables in a YARN
- * container launch context. The launch context then ensures that those resources will be
- * copied into the containers transient working directory.
- *
- * @param flinkConfig
- * The Flink configuration object.
- * @param yarnConfig
- * The YARN configuration object.
- * @param env
- * The environment variables.
- * @param tmParams
- * The TaskExecutor container memory parameters.
- * @param taskManagerConfig
- * The configuration for the TaskExecutors.
- * @param workingDirectory
- * The current application master container's working directory.
- * @param taskManagerMainClass
- * The class with the main method.
- * @param log
- * The logger.
- *
- * @return The launch context for the TaskManager processes.
- *
- * @throws Exception Thrown if teh launch context could not be created, for example if
- * the resources could not be copied.
- */
- private static ContainerLaunchContext createTaskExecutorContext(
- Configuration flinkConfig,
- YarnConfiguration yarnConfig,
- Map<String, String> env,
- ContaineredTaskManagerParameters tmParams,
- Configuration taskManagerConfig,
- String workingDirectory,
- Class<?> taskManagerMainClass,
- Logger log) throws Exception {
-
- // get and validate all relevant variables
-
- String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
-
- String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-
- String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
- String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
- String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
- final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
- log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
- final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
- log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
-
- final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
- log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
-
- final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
- log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
-
- String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
- // obtain a handle to the file system used by YARN
- final org.apache.hadoop.fs.FileSystem yarnFileSystem;
- try {
- yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
- } catch (IOException e) {
- throw new Exception("Could not access YARN's default file system", e);
- }
-
- //register keytab
- LocalResource keytabResource = null;
- if(remoteKeytabPath != null) {
- log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
- keytabResource = Records.newRecord(LocalResource.class);
- Path keytabPath = new Path(remoteKeytabPath);
- Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
- }
-
- //To support Yarn Secure Integration Test Scenario
- LocalResource yarnConfResource = null;
- LocalResource krb5ConfResource = null;
- boolean hasKrb5 = false;
- if(remoteYarnConfPath != null && remoteKrb5Path != null) {
- log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
- yarnConfResource = Records.newRecord(LocalResource.class);
- Path yarnConfPath = new Path(remoteYarnConfPath);
- Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
-
- log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
- krb5ConfResource = Records.newRecord(LocalResource.class);
- Path krb5ConfPath = new Path(remoteKrb5Path);
- Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
-
- hasKrb5 = true;
- }
-
- // register Flink Jar with remote HDFS
- LocalResource flinkJar = Records.newRecord(LocalResource.class);
- {
- Path remoteJarPath = new Path(remoteFlinkJarPath);
- Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
- }
-
- // register conf with local fs
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- {
- // write the TaskManager configuration to a local file
- final File taskManagerConfigFile =
- new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
- log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
- BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-
- Utils.setupLocalResource(yarnFileSystem, appId,
- new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
- log.info("Prepared local resource for modified yaml: {}", flinkConf);
- }
-
- Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
- taskManagerLocalResources.put("flink.jar", flinkJar);
- taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
- //To support Yarn Secure Integration Test Scenario
- if(yarnConfResource != null && krb5ConfResource != null) {
- taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
- taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
- }
-
- if(keytabResource != null) {
- taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
- }
-
- // prepare additional files to be shipped
- for (String pathStr : shipListString.split(",")) {
- if (!pathStr.isEmpty()) {
- LocalResource resource = Records.newRecord(LocalResource.class);
- Path path = new Path(pathStr);
- Utils.registerLocalResource(yarnFileSystem, path, resource);
- taskManagerLocalResources.put(path.getName(), resource);
- }
- }
-
- // now that all resources are prepared, we can create the launch context
-
- log.info("Creating container launch context for TaskManagers");
-
- boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
- boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-
- String launchCommand = BootstrapTools.getTaskManagerShellCommand(
- flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
- hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-
- log.info("Starting TaskManagers with command: " + launchCommand);
-
- ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
- ctx.setCommands(Collections.singletonList(launchCommand));
- ctx.setLocalResources(taskManagerLocalResources);
-
- Map<String, String> containerEnv = new HashMap<>();
- containerEnv.putAll(tmParams.taskManagerEnv());
-
- // add YARN classpath, etc to the container environment
- containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
- Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
- containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
-
- if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
- containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
- containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
- }
-
- ctx.setEnvironment(containerEnv);
-
- try (DataOutputBuffer dob = new DataOutputBuffer()) {
- log.debug("Adding security tokens to Task Executor Container launch Context....");
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
- Credentials credentials = user.getCredentials();
- credentials.writeTokenStorageToStream(dob);
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- ctx.setTokens(securityTokens);
- }
- catch (Throwable t) {
- log.error("Getting current user info failed when trying to launch the container", t);
- }
-
- return ctx;
- }
/**
- * Generate priority by given resouce profile.
+ * Generate priority by given resource profile.
* Priority is only used for distinguishing request of different resource.
* @param resourceProfile The resource profile of a request
* @return The priority of this resource profile.
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..8534ba8
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class UtilsTest extends TestLogger {
+
+ private static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
+ new JavaTestKit(system) {{
+
+ final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+ Configuration flinkConfig = new Configuration();
+ YarnConfiguration yarnConfig = new YarnConfiguration();
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+ String applicationMasterHostName = "localhost";
+ String webInterfaceURL = "foobar";
+ ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
+ 1l, 1l, 1l, 1, new HashMap<String, String>());
+ ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
+ int yarnHeartbeatIntervalMillis = 1000;
+ int maxFailedContainers = 10;
+ int numInitialTaskManagers = 5;
+ final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
+ AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
+ NMClient nodeManagerClient = mock(NMClient.class);
+ UUID leaderSessionID = UUID.randomUUID();
+
+ final List<Container> containerList = new ArrayList<>();
+
+ for (int i = 0; i < numInitialTaskManagers; i++) {
+ containerList.add(new TestingContainer("container_" + i, "localhost"));
+ }
+
+ doAnswer(new Answer() {
+ int counter = 0;
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (counter < containerList.size()) {
+ callbackHandler.onContainersAllocated(
+ Collections.singletonList(
+ containerList.get(counter++)
+ ));
+ }
+ return null;
+ }
+ }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+ ActorRef resourceManager = null;
+ ActorRef leader1;
+
+ try {
+ leader1 = system.actorOf(
+ Props.create(
+ TestingUtils.ForwardingActor.class,
+ getRef(),
+ Option.apply(leaderSessionID)
+ ));
+
+ resourceManager = system.actorOf(
+ Props.create(
+ TestingYarnFlinkResourceManager.class,
+ flinkConfig,
+ yarnConfig,
+ leaderRetrievalService,
+ applicationMasterHostName,
+ webInterfaceURL,
+ taskManagerParameters,
+ taskManagerLaunchContext,
+ yarnHeartbeatIntervalMillis,
+ maxFailedContainers,
+ numInitialTaskManagers,
+ callbackHandler,
+ resourceManagerClient,
+ nodeManagerClient
+ ));
+
+ leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+ final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
+ final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Container container = (Container) invocation.getArguments()[0];
+ resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+ leader1Gateway);
+ return null;
+ }
+ }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class));
+
+ expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+ resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+ for (int i = 0; i < containerList.size(); i++) {
+ expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+ }
+
+ Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+ Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
+
+ leaderRetrievalService.notifyListener(null, null);
+
+ leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+ expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+ resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+ for (Container container: containerList) {
+ resourceManagerGateway.tell(
+ new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+ leader1Gateway);
+ }
+
+ for (int i = 0; i < containerList.size(); i++) {
+ expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+ }
+
+ Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft());
+
+ int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+ assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
+ } finally {
+ if (resourceManager != null) {
+ resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }};
+ }
+
+ static class TestingContainer extends Container {
+
+ private final String id;
+ private final String host;
+
+ TestingContainer(String id, String host) {
+ this.id = id;
+ this.host = host;
+ }
+
+ @Override
+ public ContainerId getId() {
+ ContainerId containerId = mock(ContainerId.class);
+ when(containerId.toString()).thenReturn(id);
+
+ return containerId;
+ }
+
+ @Override
+ public void setId(ContainerId containerId) {
+
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ NodeId nodeId = mock(NodeId.class);
+ when(nodeId.getHost()).thenReturn(host);
+
+ return nodeId;
+ }
+
+ @Override
+ public void setNodeId(NodeId nodeId) {
+
+ }
+
+ @Override
+ public String getNodeHttpAddress() {
+ return null;
+ }
+
+ @Override
+ public void setNodeHttpAddress(String s) {
+
+ }
+
+ @Override
+ public Resource getResource() {
+ return null;
+ }
+
+ @Override
+ public void setResource(Resource resource) {
+
+ }
+
+ @Override
+ public Priority getPriority() {
+ return null;
+ }
+
+ @Override
+ public void setPriority(Priority priority) {
+
+ }
+
+ @Override
+ public Token getContainerToken() {
+ return null;
+ }
+
+ @Override
+ public void setContainerToken(Token token) {
+
+ }
+
+ @Override
+ public int compareTo(Container o) {
+ return 0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
new file mode 100644
index 0000000..f874896
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.apache.flink.yarn.YarnConfigKeys.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class YarnApplicationMasterRunnerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void testCreateTaskExecutorContext() throws Exception {
+ File root = folder.getRoot();
+ File home = new File(root, "home");
+ boolean created = home.mkdir();
+ assertTrue(created);
+
+ Answer<?> getDefault = new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return invocationOnMock.getArguments()[1];
+ }
+ };
+ Configuration flinkConf = new Configuration();
+ YarnConfiguration yarnConf = mock(YarnConfiguration.class);
+ doAnswer(getDefault).when(yarnConf).get(anyString(), anyString());
+ doAnswer(getDefault).when(yarnConf).getInt(anyString(), anyInt());
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return new String[] {(String) invocationOnMock.getArguments()[1]};
+ }
+ }).when(yarnConf).getStrings(anyString(), Mockito.<String> anyVararg());
+
+ Map<String, String> env = ImmutableMap. <String, String> builder()
+ .put(ENV_APP_ID, "foo")
+ .put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath())
+ .put(ENV_CLIENT_SHIP_FILES, "")
+ .put(ENV_FLINK_CLASSPATH, "")
+ .put(ENV_HADOOP_USER_NAME, "foo")
+ .put(FLINK_JAR_PATH, root.toURI().toString())
+ .build();
+ ContaineredTaskManagerParameters tmParams = mock(ContaineredTaskManagerParameters.class);
+ Configuration taskManagerConf = new Configuration();
+
+ String workingDirectory = root.getAbsolutePath();
+ Class<?> taskManagerMainClass = YarnApplicationMasterRunnerTest.class;
+ ContainerLaunchContext ctx = Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams,
+ taskManagerConf, workingDirectory, taskManagerMainClass, LOG);
+ assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
deleted file mode 100644
index a3ff6c4..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class YarnFlinkResourceManagerTest extends TestLogger {
-
- private static ActorSystem system;
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createLocalActorSystem(new Configuration());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- @Test
- public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
- new JavaTestKit(system) {{
-
- final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
-
- Configuration flinkConfig = new Configuration();
- YarnConfiguration yarnConfig = new YarnConfiguration();
- TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
- String applicationMasterHostName = "localhost";
- String webInterfaceURL = "foobar";
- ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
- 1l, 1l, 1l, 1, new HashMap<String, String>());
- ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
- int yarnHeartbeatIntervalMillis = 1000;
- int maxFailedContainers = 10;
- int numInitialTaskManagers = 5;
- final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
- AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
- NMClient nodeManagerClient = mock(NMClient.class);
- UUID leaderSessionID = UUID.randomUUID();
-
- final List<Container> containerList = new ArrayList<>();
-
- for (int i = 0; i < numInitialTaskManagers; i++) {
- containerList.add(new TestingContainer("container_" + i, "localhost"));
- }
-
- doAnswer(new Answer() {
- int counter = 0;
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- if (counter < containerList.size()) {
- callbackHandler.onContainersAllocated(
- Collections.singletonList(
- containerList.get(counter++)
- ));
- }
- return null;
- }
- }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
- ActorRef resourceManager = null;
- ActorRef leader1;
-
- try {
- leader1 = system.actorOf(
- Props.create(
- TestingUtils.ForwardingActor.class,
- getRef(),
- Option.apply(leaderSessionID)
- ));
-
- resourceManager = system.actorOf(
- Props.create(
- TestingYarnFlinkResourceManager.class,
- flinkConfig,
- yarnConfig,
- leaderRetrievalService,
- applicationMasterHostName,
- webInterfaceURL,
- taskManagerParameters,
- taskManagerLaunchContext,
- yarnHeartbeatIntervalMillis,
- maxFailedContainers,
- numInitialTaskManagers,
- callbackHandler,
- resourceManagerClient,
- nodeManagerClient
- ));
-
- leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
- final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
- final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
-
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Container container = (Container) invocation.getArguments()[0];
- resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
- leader1Gateway);
- return null;
- }
- }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class));
-
- expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
- resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
- for (int i = 0; i < containerList.size(); i++) {
- expectMsgClass(deadline.timeLeft(), Acknowledge.class);
- }
-
- Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
- Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
-
- leaderRetrievalService.notifyListener(null, null);
-
- leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
- expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
- resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
- for (Container container: containerList) {
- resourceManagerGateway.tell(
- new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
- leader1Gateway);
- }
-
- for (int i = 0; i < containerList.size(); i++) {
- expectMsgClass(deadline.timeLeft(), Acknowledge.class);
- }
-
- Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft());
-
- int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
- assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
- } finally {
- if (resourceManager != null) {
- resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
- }};
- }
-
- static class TestingContainer extends Container {
-
- private final String id;
- private final String host;
-
- TestingContainer(String id, String host) {
- this.id = id;
- this.host = host;
- }
-
- @Override
- public ContainerId getId() {
- ContainerId containerId = mock(ContainerId.class);
- when(containerId.toString()).thenReturn(id);
-
- return containerId;
- }
-
- @Override
- public void setId(ContainerId containerId) {
-
- }
-
- @Override
- public NodeId getNodeId() {
- NodeId nodeId = mock(NodeId.class);
- when(nodeId.getHost()).thenReturn(host);
-
- return nodeId;
- }
-
- @Override
- public void setNodeId(NodeId nodeId) {
-
- }
-
- @Override
- public String getNodeHttpAddress() {
- return null;
- }
-
- @Override
- public void setNodeHttpAddress(String s) {
-
- }
-
- @Override
- public Resource getResource() {
- return null;
- }
-
- @Override
- public void setResource(Resource resource) {
-
- }
-
- @Override
- public Priority getPriority() {
- return null;
- }
-
- @Override
- public void setPriority(Priority priority) {
-
- }
-
- @Override
- public Token getContainerToken() {
- return null;
- }
-
- @Override
- public void setContainerToken(Token token) {
-
- }
-
- @Override
- public int compareTo(Container o) {
- return 0;
- }
- }
-}
[2/2] flink git commit: [FLINK-5729] [examples] Add hostname option
to SocketWindowWordCount examples
Posted by se...@apache.org.
[FLINK-5729] [examples] Add hostname option to SocketWindowWordCount examples
This closes #3283
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c5b771
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c5b771
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c5b771
Branch: refs/heads/master
Commit: 30c5b771a7943e981dd5f67131c932fdb204fbc2
Parents: 6bc6b22
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Feb 7 15:52:26 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 13 20:51:50 2017 +0100
----------------------------------------------------------------------
.../examples/socket/SocketWindowWordCount.java | 17 ++++++++-------
.../examples/socket/SocketWindowWordCount.scala | 22 +++++++++++++-------
2 files changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30c5b771/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index dd2e061..250c5b9 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -35,23 +35,26 @@ import org.apache.flink.util.Collector;
* <pre>
* nc -l 12345
* </pre>
- * and run this example with the port as an argument.
+ * and run this example with the hostname and the port as arguments.
*/
@SuppressWarnings("serial")
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
- // the port to connect to
+ // the host and the port to connect to
+ final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
+ hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
- System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
- "where port is the address of the text server");
- System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
- "into the command line");
+ System.err.println("No port specified. Please run 'SocketWindowWordCount " +
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
+ "and port is the address of the text server");
+ System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
+ "type the input text into the command line");
return;
}
@@ -59,7 +62,7 @@ public class SocketWindowWordCount {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
- DataStream<String> text = env.socketTextStream("localhost", port, "\n");
+ DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
http://git-wip-us.apache.org/repos/asf/flink/blob/30c5b771/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index 1761b84..d2afa4d 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -31,20 +31,26 @@ import org.apache.flink.streaming.api.windowing.time.Time
* <pre>
* nc -l 12345
* </pre>
- * and run this example with the port as an argument.
+ * and run this example with the hostname and the port as arguments..
*/
object SocketWindowWordCount {
/** Main program method */
def main(args: Array[String]) : Unit = {
-
- // the port to connect to
- val port: Int = try {
- ParameterTool.fromArgs(args).getInt("port")
+
+ // the host and the port to connect to
+ var hostname: String = "localhost"
+ var port: Int = 0
+
+ try {
+ val params = ParameterTool.fromArgs(args)
+ hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
+ port = params.getInt("port")
} catch {
case e: Exception => {
- System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
- "where port is the address of the text server")
+ System.err.println("No port specified. Please run 'SocketWindowWordCount " +
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " +
+ "is the address of the text server")
System.err.println("To start a simple text server, run 'netcat -l <port>' " +
"and type the input text into the command line")
return
@@ -55,7 +61,7 @@ object SocketWindowWordCount {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
- val text = env.socketTextStream("localhost", port, '\n')
+ val text = env.socketTextStream(hostname, port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text