You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/13 19:53:49 UTC

[1/2] flink git commit: [FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.

Repository: flink
Updated Branches:
  refs/heads/master 6bc6b225e -> 186b12309


[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.

This closes #3202


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/186b1230
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/186b1230
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/186b1230

Branch: refs/heads/master
Commit: 186b12309b540f82a055be28f3f005dce4b8cf46
Parents: 30c5b77
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Jan 31 12:11:01 2017 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 13 20:51:50 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 223 +++++++++++++-
 .../flink/yarn/YarnApplicationMasterRunner.java | 236 +--------------
 .../apache/flink/yarn/YarnResourceManager.java  | 211 +------------
 .../java/org/apache/flink/yarn/UtilsTest.java   | 298 +++++++++++++++++++
 .../yarn/YarnApplicationMasterRunnerTest.java   |  93 ++++++
 .../yarn/YarnFlinkResourceManagerTest.java      | 298 -------------------
 6 files changed, 617 insertions(+), 742 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 94d4582..60f7204 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -23,11 +23,16 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +58,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+
 /**
  * Utility class that provides helper methods to work with Apache Hadoop YARN.
  */
@@ -107,7 +114,7 @@ public final class Utils {
 		addToEnvironment(
 			appMasterEnv,
 			Environment.CLASSPATH.name(),
-			appMasterEnv.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+			appMasterEnv.get(ENV_FLINK_CLASSPATH));
 		String[] applicationClassPathEntries = conf.getStrings(
 			YarnConfiguration.YARN_APPLICATION_CLASSPATH,
 			YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH);
@@ -264,4 +271,218 @@ public final class Utils {
 		}
 		return result;
 	}
+
+	/**
+	 * Creates the launch context, which describes how to bring up a TaskExecutor / TaskManager process in
+	 * an allocated YARN container.
+	 *
+	 * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
+	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
+	 * container launch context. The launch context then ensures that those resources will be
+	 * copied into the containers transient working directory.
+	 *
+	 * @param flinkConfig
+	 *		 The Flink configuration object.
+	 * @param yarnConfig
+	 *		 The YARN configuration object.
+	 * @param env
+	 *		 The environment variables.
+	 * @param tmParams
+	 *		 The TaskExecutor container memory parameters.
+	 * @param taskManagerConfig
+	 *		 The configuration for the TaskExecutors.
+	 * @param workingDirectory
+	 *		 The current application master container's working directory.
+	 * @param taskManagerMainClass
+	 *		 The class with the main method.
+	 * @param log
+	 *		 The logger.
+	 *
+	 * @return The launch context for the TaskManager processes.
+	 *
+	 * @throws Exception Thrown if teh launch context could not be created, for example if
+	 *				   the resources could not be copied.
+	 */
+	static ContainerLaunchContext createTaskExecutorContext(
+		org.apache.flink.configuration.Configuration flinkConfig,
+		YarnConfiguration yarnConfig,
+		Map<String, String> env,
+		ContaineredTaskManagerParameters tmParams,
+		org.apache.flink.configuration.Configuration taskManagerConfig,
+		String workingDirectory,
+		Class<?> taskManagerMainClass,
+		Logger log) throws Exception {
+
+		// get and validate all relevant variables
+
+		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
+		require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
+
+		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
+		require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
+
+		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+		require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+
+		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+		require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+
+		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
+		log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+		log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
+
+		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
+		log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+		String classPathString = env.get(ENV_FLINK_CLASSPATH);
+		require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
+
+		//register keytab
+		LocalResource keytabResource = null;
+		if(remoteKeytabPath != null) {
+			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
+			keytabResource = Records.newRecord(LocalResource.class);
+			Path keytabPath = new Path(remoteKeytabPath);
+			FileSystem fs = keytabPath.getFileSystem(yarnConfig);
+			registerLocalResource(fs, keytabPath, keytabResource);
+		}
+
+		//To support Yarn Secure Integration Test Scenario
+		LocalResource yarnConfResource = null;
+		LocalResource krb5ConfResource = null;
+		boolean hasKrb5 = false;
+		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
+			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
+			yarnConfResource = Records.newRecord(LocalResource.class);
+			Path yarnConfPath = new Path(remoteYarnConfPath);
+			FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
+			registerLocalResource(fs, yarnConfPath, yarnConfResource);
+
+			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
+			krb5ConfResource = Records.newRecord(LocalResource.class);
+			Path krb5ConfPath = new Path(remoteKrb5Path);
+			fs = krb5ConfPath.getFileSystem(yarnConfig);
+			registerLocalResource(fs, krb5ConfPath, krb5ConfResource);
+
+			hasKrb5 = true;
+		}
+
+		// register Flink Jar with remote HDFS
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		{
+			Path remoteJarPath = new Path(remoteFlinkJarPath);
+			FileSystem fs = remoteJarPath.getFileSystem(yarnConfig);
+			registerLocalResource(fs, remoteJarPath, flinkJar);
+		}
+
+		// register conf with local fs
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+		{
+			// write the TaskManager configuration to a local file
+			final File taskManagerConfigFile =
+					new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
+			log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
+			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
+
+			Path homeDirPath = new Path(clientHomeDir);
+			FileSystem fs = homeDirPath.getFileSystem(yarnConfig);
+			setupLocalResource(fs, appId,
+					new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
+
+			log.info("Prepared local resource for modified yaml: {}", flinkConf);
+		}
+
+		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		//To support Yarn Secure Integration Test Scenario
+		if(yarnConfResource != null && krb5ConfResource != null) {
+			taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
+			taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
+		}
+
+		if(keytabResource != null) {
+			taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
+		}
+
+		// prepare additional files to be shipped
+		for (String pathStr : shipListString.split(",")) {
+			if (!pathStr.isEmpty()) {
+				LocalResource resource = Records.newRecord(LocalResource.class);
+				Path path = new Path(pathStr);
+				registerLocalResource(path.getFileSystem(yarnConfig), path, resource);
+				taskManagerLocalResources.put(path.getName(), resource);
+			}
+		}
+
+		// now that all resources are prepared, we can create the launch context
+
+		log.info("Creating container launch context for TaskManagers");
+
+		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
+		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
+
+		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
+				flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+				hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
+
+		log.info("Starting TaskManagers with command: " + launchCommand);
+
+		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+		ctx.setCommands(Collections.singletonList(launchCommand));
+		ctx.setLocalResources(taskManagerLocalResources);
+
+		Map<String, String> containerEnv = new HashMap<>();
+		containerEnv.putAll(tmParams.taskManagerEnv());
+
+		// add YARN classpath, etc to the container environment
+		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
+		setupYarnClassPath(yarnConfig, containerEnv);
+
+		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
+
+		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
+			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
+			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		ctx.setEnvironment(containerEnv);
+
+		try (DataOutputBuffer dob = new DataOutputBuffer()) {
+			log.debug("Adding security tokens to Task Executor Container launch Context....");
+			UserGroupInformation user = UserGroupInformation.getCurrentUser();
+			Credentials credentials = user.getCredentials();
+			credentials.writeTokenStorageToStream(dob);
+			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+			ctx.setTokens(securityTokens);
+		}
+		catch (Throwable t) {
+			log.error("Getting current user info failed when trying to launch the container", t);
+		}
+
+		return ctx;
+	}
+
+	/**
+	 * Validates a condition, throwing a RuntimeException if the condition is violated.
+	 *
+	 * @param condition The condition.
+	 * @param message The message for the runtime exception, with format variables as defined by
+	 *                {@link String#format(String, Object...)}.
+	 * @param values The format arguments.
+	 */
+	static void require(boolean condition, String message, Object... values) {
+		if (!condition) {
+			throw new RuntimeException(String.format(message, values));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 5cc51e4..9d5673c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -47,16 +47,10 @@ import org.apache.flink.runtime.webmonitor.WebMonitor;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,19 +60,14 @@ import scala.Some;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.HashMap;
-import java.util.UUID;
-import java.util.Collections;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.Utils.require;
 
 /**
  * This class is the executable entry point for the YARN application master.
@@ -329,7 +318,7 @@ public class YarnApplicationMasterRunner {
 					config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
 			LOG.debug("TaskManager configuration: {}", taskManagerConfig);
 
-			final ContainerLaunchContext taskManagerContext = createTaskManagerContext(
+			final ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext(
 				config, yarnConfig, ENV,
 				taskManagerParameters, taskManagerConfig,
 				currDir, getTaskManagerClass(), LOG);
@@ -483,20 +472,6 @@ public class YarnApplicationMasterRunner {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Validates a condition, throwing a RuntimeException if the condition is violated.
-	 * 
-	 * @param condition The condition.
-	 * @param message The message for the runtime exception, with format variables as defined by
-	 *                {@link String#format(String, Object...)}.
-	 * @param values The format arguments.
-	 */
-	private static void require(boolean condition, String message, Object... values) {
-		if (!condition) {
-			throw new RuntimeException(String.format(message, values));
-		}
-	}
-
-	/**
 	 * 
 	 * @param baseDirectory
 	 * @param additional
@@ -549,211 +524,4 @@ public class YarnApplicationMasterRunner {
 
 		return configuration;
 	}
-
-	/**
-	 * Creates the launch context, which describes how to bring up a TaskManager process in
-	 * an allocated YARN container.
-	 * 
-	 * <p>This code is extremely YARN specific and registers all the resources that the TaskManager
-	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
-	 * container launch context. The launch context then ensures that those resources will be
-	 * copied into the containers transient working directory. 
-	 * 
-	 * <p>We do this work before we start the ResourceManager actor in order to fail early if
-	 * any of the operations here fail.
-	 * 
-	 * @param flinkConfig
-	 *         The Flink configuration object.
-	 * @param yarnConfig
-	 *         The YARN configuration object.
-	 * @param env
-	 *         The environment variables.
-	 * @param tmParams
-	 *         The TaskManager container memory parameters. 
-	 * @param taskManagerConfig
-	 *         The configuration for the TaskManagers.
-	 * @param workingDirectory
-	 *         The current application master container's working directory. 
-	 * @param taskManagerMainClass
-	 *         The class with the main method.
-	 * @param log
-	 *         The logger.
-	 * 
-	 * @return The launch context for the TaskManager processes.
-	 * 
-	 * @throws Exception Thrown if teh launch context could not be created, for example if
-	 *                   the resources could not be copied.
-	 */
-	public static ContainerLaunchContext createTaskManagerContext(
-			Configuration flinkConfig,
-			YarnConfiguration yarnConfig,
-			Map<String, String> env,
-			ContaineredTaskManagerParameters tmParams,
-			Configuration taskManagerConfig,
-			String workingDirectory,
-			Class<?> taskManagerMainClass,
-			Logger log) throws Exception {
-
-		log.info("Setting up resources for TaskManagers");
-
-		// get and validate all relevant variables
-
-		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
-		require(remoteFlinkJarPath != null, "Environment variable %s not set", YarnConfigKeys.FLINK_JAR_PATH);
-
-		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-		require(appId != null, "Environment variable %s not set", YarnConfigKeys.ENV_APP_ID);
-
-		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-		require(clientHomeDir != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
-		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-		require(shipListString != null, "Environment variable %s not set", YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-		require(yarnClientUsername != null, "Environment variable %s not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
-		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
-		LOG.info("TM:remoteKeytabPath obtained {}", remoteKeytabPath);
-
-		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-		LOG.info("TM:remoteKeytabPrincipal obtained {}", remoteKeytabPrincipal);
-
-		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-		LOG.info("TM:remoteYarnConfPath obtained {}", remoteYarnConfPath);
-
-		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
-		LOG.info("TM:remotekrb5Path obtained {}", remoteKrb5Path);
-
-		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
-		require(classPathString != null, "Environment variable %s not set", YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
-		// obtain a handle to the file system used by YARN
-		final org.apache.hadoop.fs.FileSystem yarnFileSystem;
-		try {
-			yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
-		} catch (IOException e) {
-			throw new Exception("Could not access YARN's default file system", e);
-		}
-
-		//register keytab
-		LocalResource keytabResource = null;
-		if(remoteKeytabPath != null) {
-			LOG.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
-			keytabResource = Records.newRecord(LocalResource.class);
-			Path keytabPath = new Path(remoteKeytabPath);
-			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
-		}
-
-		//To support Yarn Secure Integration Test Scenario
-		LocalResource yarnConfResource = null;
-		LocalResource krb5ConfResource = null;
-		boolean hasKrb5 = false;
-		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
-			LOG.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
-			yarnConfResource = Records.newRecord(LocalResource.class);
-			Path yarnConfPath = new Path(remoteYarnConfPath);
-			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
-
-			LOG.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
-			krb5ConfResource = Records.newRecord(LocalResource.class);
-			Path krb5ConfPath = new Path(remoteKrb5Path);
-			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
-
-			hasKrb5 = true;
-		}
-
-		// register Flink Jar with remote HDFS
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
-		{
-			Path remoteJarPath = new Path(remoteFlinkJarPath);
-			Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
-		}
-
-		// register conf with local fs
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		{
-			// write the TaskManager configuration to a local file
-			final File taskManagerConfigFile = 
-				new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
-			LOG.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
-			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-
-			Utils.setupLocalResource(yarnFileSystem, appId, 
-				new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
-
-			log.info("Prepared local resource for modified yaml: {}", flinkConf);
-		}
-
-		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
-		taskManagerLocalResources.put("flink.jar", flinkJar);
-		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
-		//To support Yarn Secure Integration Test Scenario
-		if(yarnConfResource != null && krb5ConfResource != null) {
-			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
-		}
-
-		if(keytabResource != null) {
-			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
-		}
-
-		// prepare additional files to be shipped
-		for (String pathStr : shipListString.split(",")) {
-			if (!pathStr.isEmpty()) {
-				LocalResource resource = Records.newRecord(LocalResource.class);
-				Path path = new Path(pathStr);
-				Utils.registerLocalResource(yarnFileSystem, path, resource);
-				taskManagerLocalResources.put(path.getName(), resource);
-			}
-		}
-
-		// now that all resources are prepared, we can create the launch context
-
-		log.info("Creating container launch context for TaskManagers");
-
-		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
-		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-
-		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
-			flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-			hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-
-		log.info("Starting TaskManagers with command: " + launchCommand);
-
-		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-		ctx.setCommands(Collections.singletonList(launchCommand));
-		ctx.setLocalResources(taskManagerLocalResources);
-
-		Map<String, String> containerEnv = new HashMap<>();
-		containerEnv.putAll(tmParams.taskManagerEnv());
-
-		// add YARN classpath, etc to the container environment
-		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
-		Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
-		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
-
-		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
-			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
-			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
-		}
-
-		ctx.setEnvironment(containerEnv);
-
-		try (DataOutputBuffer dob = new DataOutputBuffer()) {
-			LOG.debug("Adding security tokens to Task Manager Container launch Context....");
-			UserGroupInformation user = UserGroupInformation.getCurrentUser();
-			Credentials credentials = user.getCredentials();
-			credentials.writeTokenStorageToStream(dob);
-			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-			ctx.setTokens(securityTokens);
-		}
-		catch (Throwable t) {
-			log.error("Getting current user info failed when trying to launch the container", t);
-		}
-
-		return ctx;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 9b9ea39..ab96441 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -35,10 +35,6 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerExcept
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -47,29 +43,20 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 import org.apache.flink.util.ExceptionUtils;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
-
 /**
  * The yarn implementation of the resource manager. Used when the system is started
  * via the resource framework YARN.
@@ -357,7 +344,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 				flinkConfig, "", 0, 1, teRegistrationTimeout);
 		LOG.debug("TaskManager configuration: {}", taskManagerConfig);
 
-		ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext(
+		ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
 				flinkConfig, yarnConfig, ENV,
 				taskManagerParameters, taskManagerConfig,
 				currDir, YarnTaskExecutorRunner.class, LOG);
@@ -371,204 +358,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 	}
 
 
-	/**
-	 * Creates the launch context, which describes how to bring up a TaskExecutor process in
-	 * an allocated YARN container.
-	 *
-	 * <p>This code is extremely YARN specific and registers all the resources that the TaskExecutor
-	 * needs (such as JAR file, config file, ...) and all environment variables in a YARN
-	 * container launch context. The launch context then ensures that those resources will be
-	 * copied into the containers transient working directory.
-	 *
-	 * @param flinkConfig
-	 *		 The Flink configuration object.
-	 * @param yarnConfig
-	 *		 The YARN configuration object.
-	 * @param env
-	 *		 The environment variables.
-	 * @param tmParams
-	 *		 The TaskExecutor container memory parameters.
-	 * @param taskManagerConfig
-	 *		 The configuration for the TaskExecutors.
-	 * @param workingDirectory
-	 *		 The current application master container's working directory.
-	 * @param taskManagerMainClass
-	 *		 The class with the main method.
-	 * @param log
-	 *		 The logger.
-	 *
-	 * @return The launch context for the TaskManager processes.
-	 *
-	 * @throws Exception Thrown if teh launch context could not be created, for example if
-	 *				   the resources could not be copied.
-	 */
-	private static ContainerLaunchContext createTaskExecutorContext(
-			Configuration flinkConfig,
-			YarnConfiguration yarnConfig,
-			Map<String, String> env,
-			ContaineredTaskManagerParameters tmParams,
-			Configuration taskManagerConfig,
-			String workingDirectory,
-			Class<?> taskManagerMainClass,
-			Logger log) throws Exception {
-
-		// get and validate all relevant variables
-
-		String remoteFlinkJarPath = env.get(YarnConfigKeys.FLINK_JAR_PATH);
-		
-		String appId = env.get(YarnConfigKeys.ENV_APP_ID);
-
-		String clientHomeDir = env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
-
-		String shipListString = env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
-
-		String yarnClientUsername = env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-
-		final String remoteKeytabPath = env.get(YarnConfigKeys.KEYTAB_PATH);
-		log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
-		final String remoteKeytabPrincipal = env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-		log.info("TM:remote keytab principal obtained {}", remoteKeytabPrincipal);
-
-		final String remoteYarnConfPath = env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-		log.info("TM:remote yarn conf path obtained {}", remoteYarnConfPath);
-
-		final String remoteKrb5Path = env.get(YarnConfigKeys.ENV_KRB5_PATH);
-		log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
-
-		String classPathString = env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
-
-		// obtain a handle to the file system used by YARN
-		final org.apache.hadoop.fs.FileSystem yarnFileSystem;
-		try {
-			yarnFileSystem = org.apache.hadoop.fs.FileSystem.get(yarnConfig);
-		} catch (IOException e) {
-			throw new Exception("Could not access YARN's default file system", e);
-		}
-
-		//register keytab
-		LocalResource keytabResource = null;
-		if(remoteKeytabPath != null) {
-			log.info("Adding keytab {} to the AM container local resource bucket", remoteKeytabPath);
-			keytabResource = Records.newRecord(LocalResource.class);
-			Path keytabPath = new Path(remoteKeytabPath);
-			Utils.registerLocalResource(yarnFileSystem, keytabPath, keytabResource);
-		}
-
-		//To support Yarn Secure Integration Test Scenario
-		LocalResource yarnConfResource = null;
-		LocalResource krb5ConfResource = null;
-		boolean hasKrb5 = false;
-		if(remoteYarnConfPath != null && remoteKrb5Path != null) {
-			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
-			yarnConfResource = Records.newRecord(LocalResource.class);
-			Path yarnConfPath = new Path(remoteYarnConfPath);
-			Utils.registerLocalResource(yarnFileSystem, yarnConfPath, yarnConfResource);
-
-			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
-			krb5ConfResource = Records.newRecord(LocalResource.class);
-			Path krb5ConfPath = new Path(remoteKrb5Path);
-			Utils.registerLocalResource(yarnFileSystem, krb5ConfPath, krb5ConfResource);
-
-			hasKrb5 = true;
-		}
-
-		// register Flink Jar with remote HDFS
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
-		{
-			Path remoteJarPath = new Path(remoteFlinkJarPath);
-			Utils.registerLocalResource(yarnFileSystem, remoteJarPath, flinkJar);
-		}
-
-		// register conf with local fs
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		{
-			// write the TaskManager configuration to a local file
-			final File taskManagerConfigFile =
-					new File(workingDirectory, UUID.randomUUID() + "-taskmanager-conf.yaml");
-			log.debug("Writing TaskManager configuration to {}", taskManagerConfigFile.getAbsolutePath());
-			BootstrapTools.writeConfiguration(taskManagerConfig, taskManagerConfigFile);
-
-			Utils.setupLocalResource(yarnFileSystem, appId,
-					new Path(taskManagerConfigFile.toURI()), flinkConf, new Path(clientHomeDir));
 
-			log.info("Prepared local resource for modified yaml: {}", flinkConf);
-		}
-
-		Map<String, LocalResource> taskManagerLocalResources = new HashMap<>();
-		taskManagerLocalResources.put("flink.jar", flinkJar);
-		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
-
-		//To support Yarn Secure Integration Test Scenario
-		if(yarnConfResource != null && krb5ConfResource != null) {
-			taskManagerLocalResources.put(Utils.YARN_SITE_FILE_NAME, yarnConfResource);
-			taskManagerLocalResources.put(Utils.KRB5_FILE_NAME, krb5ConfResource);
-		}
-
-		if(keytabResource != null) {
-			taskManagerLocalResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
-		}
-
-		// prepare additional files to be shipped
-		for (String pathStr : shipListString.split(",")) {
-			if (!pathStr.isEmpty()) {
-				LocalResource resource = Records.newRecord(LocalResource.class);
-				Path path = new Path(pathStr);
-				Utils.registerLocalResource(yarnFileSystem, path, resource);
-				taskManagerLocalResources.put(path.getName(), resource);
-			}
-		}
-
-		// now that all resources are prepared, we can create the launch context
-
-		log.info("Creating container launch context for TaskManagers");
-
-		boolean hasLogback = new File(workingDirectory, "logback.xml").exists();
-		boolean hasLog4j = new File(workingDirectory, "log4j.properties").exists();
-
-		String launchCommand = BootstrapTools.getTaskManagerShellCommand(
-				flinkConfig, tmParams, ".", ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-				hasLogback, hasLog4j, hasKrb5, taskManagerMainClass);
-
-		log.info("Starting TaskManagers with command: " + launchCommand);
-
-		ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-		ctx.setCommands(Collections.singletonList(launchCommand));
-		ctx.setLocalResources(taskManagerLocalResources);
-
-		Map<String, String> containerEnv = new HashMap<>();
-		containerEnv.putAll(tmParams.taskManagerEnv());
-
-		// add YARN classpath, etc to the container environment
-		containerEnv.put(ENV_FLINK_CLASSPATH, classPathString);
-		Utils.setupYarnClassPath(yarnConfig, containerEnv);
-
-		containerEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
-
-		if(remoteKeytabPath != null && remoteKeytabPrincipal != null) {
-			containerEnv.put(YarnConfigKeys.KEYTAB_PATH, remoteKeytabPath);
-			containerEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, remoteKeytabPrincipal);
-		}
-
-		ctx.setEnvironment(containerEnv);
-
-		try (DataOutputBuffer dob = new DataOutputBuffer()) {
-			log.debug("Adding security tokens to Task Executor Container launch Context....");
-			UserGroupInformation user = UserGroupInformation.getCurrentUser();
-			Credentials credentials = user.getCredentials();
-			credentials.writeTokenStorageToStream(dob);
-			ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-			ctx.setTokens(securityTokens);
-		}
-		catch (Throwable t) {
-			log.error("Getting current user info failed when trying to launch the container", t);
-		}
-
-		return ctx;
-	}
 	
 	/**
-	 * Generate priority by given resouce profile. 
+	 * Generate priority by given resource profile.
 	 * Priority is only used for distinguishing request of different resource.
 	 * @param resourceProfile The resource profile of a request
 	 * @return The priority of this resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..8534ba8
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
+import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class UtilsTest extends TestLogger {
+
+	private static ActorSystem system;
+
+	@BeforeClass
+	public static void setup() {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(system);
+	}
+
+	@Test
+	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
+		new JavaTestKit(system) {{
+
+			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+			Configuration flinkConfig = new Configuration();
+			YarnConfiguration yarnConfig = new YarnConfiguration();
+			TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
+			String applicationMasterHostName = "localhost";
+			String webInterfaceURL = "foobar";
+			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
+				1l, 1l, 1l, 1, new HashMap<String, String>());
+			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
+			int yarnHeartbeatIntervalMillis = 1000;
+			int maxFailedContainers = 10;
+			int numInitialTaskManagers = 5;
+			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
+			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
+			NMClient nodeManagerClient = mock(NMClient.class);
+			UUID leaderSessionID = UUID.randomUUID();
+
+			final List<Container> containerList = new ArrayList<>();
+
+			for (int i = 0; i < numInitialTaskManagers; i++) {
+				containerList.add(new TestingContainer("container_" + i, "localhost"));
+			}
+
+			doAnswer(new Answer() {
+				int counter = 0;
+				@Override
+				public Object answer(InvocationOnMock invocation) throws Throwable {
+					if (counter < containerList.size()) {
+						callbackHandler.onContainersAllocated(
+							Collections.singletonList(
+								containerList.get(counter++)
+							));
+					}
+					return null;
+				}
+			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
+
+			ActorRef resourceManager = null;
+			ActorRef leader1;
+
+			try {
+				leader1 = system.actorOf(
+					Props.create(
+						TestingUtils.ForwardingActor.class,
+						getRef(),
+						Option.apply(leaderSessionID)
+					));
+
+				resourceManager = system.actorOf(
+					Props.create(
+						TestingYarnFlinkResourceManager.class,
+						flinkConfig,
+						yarnConfig,
+						leaderRetrievalService,
+						applicationMasterHostName,
+						webInterfaceURL,
+						taskManagerParameters,
+						taskManagerLaunchContext,
+						yarnHeartbeatIntervalMillis,
+						maxFailedContainers,
+						numInitialTaskManagers,
+						callbackHandler,
+						resourceManagerClient,
+						nodeManagerClient
+					));
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
+				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
+
+				doAnswer(new Answer() {
+					@Override
+					public Object answer(InvocationOnMock invocation) throws Throwable {
+						Container container = (Container) invocation.getArguments()[0];
+						resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+							leader1Gateway);
+						return null;
+					}
+				}).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class));
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
+
+				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
+
+				leaderRetrievalService.notifyListener(null, null);
+
+				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
+
+				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
+
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+
+				for (Container container: containerList) {
+					resourceManagerGateway.tell(
+						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
+						leader1Gateway);
+				}
+
+				for (int i = 0; i < containerList.size(); i++) {
+					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
+				}
+
+				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft());
+
+				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+
+				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
+			} finally {
+				if (resourceManager != null) {
+					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+				}
+			}
+		}};
+	}
+
+	static class TestingContainer extends Container {
+
+		private final String id;
+		private final String host;
+
+		TestingContainer(String id, String host) {
+			this.id = id;
+			this.host = host;
+		}
+
+		@Override
+		public ContainerId getId() {
+			ContainerId containerId = mock(ContainerId.class);
+			when(containerId.toString()).thenReturn(id);
+
+			return containerId;
+		}
+
+		@Override
+		public void setId(ContainerId containerId) {
+
+		}
+
+		@Override
+		public NodeId getNodeId() {
+			NodeId nodeId = mock(NodeId.class);
+			when(nodeId.getHost()).thenReturn(host);
+
+			return nodeId;
+		}
+
+		@Override
+		public void setNodeId(NodeId nodeId) {
+
+		}
+
+		@Override
+		public String getNodeHttpAddress() {
+			return null;
+		}
+
+		@Override
+		public void setNodeHttpAddress(String s) {
+
+		}
+
+		@Override
+		public Resource getResource() {
+			return null;
+		}
+
+		@Override
+		public void setResource(Resource resource) {
+
+		}
+
+		@Override
+		public Priority getPriority() {
+			return null;
+		}
+
+		@Override
+		public void setPriority(Priority priority) {
+
+		}
+
+		@Override
+		public Token getContainerToken() {
+			return null;
+		}
+
+		@Override
+		public void setContainerToken(Token token) {
+
+		}
+
+		@Override
+		public int compareTo(Container o) {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
new file mode 100644
index 0000000..f874896
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnApplicationMasterRunnerTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+import static org.apache.flink.yarn.YarnConfigKeys.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+public class YarnApplicationMasterRunnerTest {
+	private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunnerTest.class);
+
+	@Rule
+	public TemporaryFolder folder = new TemporaryFolder();
+
+	@Test
+	public void testCreateTaskExecutorContext() throws Exception {
+		File root = folder.getRoot();
+		File home = new File(root, "home");
+		boolean created = home.mkdir();
+		assertTrue(created);
+
+		Answer<?> getDefault = new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return invocationOnMock.getArguments()[1];
+			}
+		};
+		Configuration flinkConf = new Configuration();
+		YarnConfiguration yarnConf = mock(YarnConfiguration.class);
+		doAnswer(getDefault).when(yarnConf).get(anyString(), anyString());
+		doAnswer(getDefault).when(yarnConf).getInt(anyString(), anyInt());
+		doAnswer(new Answer() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				return new String[] {(String) invocationOnMock.getArguments()[1]};
+			}
+		}).when(yarnConf).getStrings(anyString(), Mockito.<String> anyVararg());
+
+		Map<String, String> env = ImmutableMap. <String, String> builder()
+			.put(ENV_APP_ID, "foo")
+			.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath())
+			.put(ENV_CLIENT_SHIP_FILES, "")
+			.put(ENV_FLINK_CLASSPATH, "")
+			.put(ENV_HADOOP_USER_NAME, "foo")
+			.put(FLINK_JAR_PATH, root.toURI().toString())
+			.build();
+		ContaineredTaskManagerParameters tmParams = mock(ContaineredTaskManagerParameters.class);
+		Configuration taskManagerConf = new Configuration();
+
+		String workingDirectory = root.getAbsolutePath();
+		Class<?> taskManagerMainClass = YarnApplicationMasterRunnerTest.class;
+		ContainerLaunchContext ctx = Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams,
+			taskManagerConf, workingDirectory, taskManagerMainClass, LOG);
+		assertEquals("file", ctx.getLocalResources().get("flink.jar").getResource().getScheme());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/186b1230/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
deleted file mode 100644
index a3ff6c4..0000000
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
-import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class YarnFlinkResourceManagerTest extends TestLogger {
-
-	private static ActorSystem system;
-
-	@BeforeClass
-	public static void setup() {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
-	}
-
-	@AfterClass
-	public static void teardown() {
-		JavaTestKit.shutdownActorSystem(system);
-	}
-
-	@Test
-	public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
-		new JavaTestKit(system) {{
-
-			final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
-
-			Configuration flinkConfig = new Configuration();
-			YarnConfiguration yarnConfig = new YarnConfiguration();
-			TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService();
-			String applicationMasterHostName = "localhost";
-			String webInterfaceURL = "foobar";
-			ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(
-				1l, 1l, 1l, 1, new HashMap<String, String>());
-			ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class);
-			int yarnHeartbeatIntervalMillis = 1000;
-			int maxFailedContainers = 10;
-			int numInitialTaskManagers = 5;
-			final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler();
-			AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class);
-			NMClient nodeManagerClient = mock(NMClient.class);
-			UUID leaderSessionID = UUID.randomUUID();
-
-			final List<Container> containerList = new ArrayList<>();
-
-			for (int i = 0; i < numInitialTaskManagers; i++) {
-				containerList.add(new TestingContainer("container_" + i, "localhost"));
-			}
-
-			doAnswer(new Answer() {
-				int counter = 0;
-				@Override
-				public Object answer(InvocationOnMock invocation) throws Throwable {
-					if (counter < containerList.size()) {
-						callbackHandler.onContainersAllocated(
-							Collections.singletonList(
-								containerList.get(counter++)
-							));
-					}
-					return null;
-				}
-			}).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class));
-
-			ActorRef resourceManager = null;
-			ActorRef leader1;
-
-			try {
-				leader1 = system.actorOf(
-					Props.create(
-						TestingUtils.ForwardingActor.class,
-						getRef(),
-						Option.apply(leaderSessionID)
-					));
-
-				resourceManager = system.actorOf(
-					Props.create(
-						TestingYarnFlinkResourceManager.class,
-						flinkConfig,
-						yarnConfig,
-						leaderRetrievalService,
-						applicationMasterHostName,
-						webInterfaceURL,
-						taskManagerParameters,
-						taskManagerLaunchContext,
-						yarnHeartbeatIntervalMillis,
-						maxFailedContainers,
-						numInitialTaskManagers,
-						callbackHandler,
-						resourceManagerClient,
-						nodeManagerClient
-					));
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID);
-				final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID);
-
-				doAnswer(new Answer() {
-					@Override
-					public Object answer(InvocationOnMock invocation) throws Throwable {
-						Container container = (Container) invocation.getArguments()[0];
-						resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-							leader1Gateway);
-						return null;
-					}
-				}).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class));
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft());
-
-				Await.ready(taskManagerRegisteredFuture, deadline.timeLeft());
-
-				leaderRetrievalService.notifyListener(null, null);
-
-				leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID);
-
-				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
-
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
-
-				for (Container container: containerList) {
-					resourceManagerGateway.tell(
-						new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)),
-						leader1Gateway);
-				}
-
-				for (int i = 0; i < containerList.size(); i++) {
-					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
-				}
-
-				Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft());
-
-				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
-
-				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
-			} finally {
-				if (resourceManager != null) {
-					resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
-				}
-			}
-		}};
-	}
-
-	static class TestingContainer extends Container {
-
-		private final String id;
-		private final String host;
-
-		TestingContainer(String id, String host) {
-			this.id = id;
-			this.host = host;
-		}
-
-		@Override
-		public ContainerId getId() {
-			ContainerId containerId = mock(ContainerId.class);
-			when(containerId.toString()).thenReturn(id);
-
-			return containerId;
-		}
-
-		@Override
-		public void setId(ContainerId containerId) {
-
-		}
-
-		@Override
-		public NodeId getNodeId() {
-			NodeId nodeId = mock(NodeId.class);
-			when(nodeId.getHost()).thenReturn(host);
-
-			return nodeId;
-		}
-
-		@Override
-		public void setNodeId(NodeId nodeId) {
-
-		}
-
-		@Override
-		public String getNodeHttpAddress() {
-			return null;
-		}
-
-		@Override
-		public void setNodeHttpAddress(String s) {
-
-		}
-
-		@Override
-		public Resource getResource() {
-			return null;
-		}
-
-		@Override
-		public void setResource(Resource resource) {
-
-		}
-
-		@Override
-		public Priority getPriority() {
-			return null;
-		}
-
-		@Override
-		public void setPriority(Priority priority) {
-
-		}
-
-		@Override
-		public Token getContainerToken() {
-			return null;
-		}
-
-		@Override
-		public void setContainerToken(Token token) {
-
-		}
-
-		@Override
-		public int compareTo(Container o) {
-			return 0;
-		}
-	}
-}


[2/2] flink git commit: [FLINK-5729] [examples] Add hostname option to SocketWindowWordCount examples

Posted by se...@apache.org.
[FLINK-5729] [examples] Add hostname option to SocketWindowWordCount examples

This closes #3283


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30c5b771
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30c5b771
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30c5b771

Branch: refs/heads/master
Commit: 30c5b771a7943e981dd5f67131c932fdb204fbc2
Parents: 6bc6b22
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Feb 7 15:52:26 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 13 20:51:50 2017 +0100

----------------------------------------------------------------------
 .../examples/socket/SocketWindowWordCount.java  | 17 ++++++++-------
 .../examples/socket/SocketWindowWordCount.scala | 22 +++++++++++++-------
 2 files changed, 24 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30c5b771/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index dd2e061..250c5b9 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -35,23 +35,26 @@ import org.apache.flink.util.Collector;
  * <pre>
  * nc -l 12345
  * </pre>
- * and run this example with the port as an argument.
+ * and run this example with the hostname and the port as arguments.
  */
 @SuppressWarnings("serial")
 public class SocketWindowWordCount {
 
 	public static void main(String[] args) throws Exception {
 
-		// the port to connect to
+		// the host and the port to connect to
+		final String hostname;
 		final int port;
 		try {
 			final ParameterTool params = ParameterTool.fromArgs(args);
+			hostname = params.has("hostname") ? params.get("hostname") : "localhost";
 			port = params.getInt("port");
 		} catch (Exception e) {
-			System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
-					"where port is the address of the text server");
-			System.err.println("To start a simple text server, run 'netcat -l <port>' and type the input text " +
-					"into the command line");
+			System.err.println("No port specified. Please run 'SocketWindowWordCount " +
+				"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
+				"and port is the address of the text server");
+			System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
+				"type the input text into the command line");
 			return;
 		}
 
@@ -59,7 +62,7 @@ public class SocketWindowWordCount {
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// get input data by connecting to the socket
-		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
+		DataStream<String> text = env.socketTextStream(hostname, port, "\n");
 
 		// parse the data, group it, window it, and aggregate the counts
 		DataStream<WordWithCount> windowCounts = text

http://git-wip-us.apache.org/repos/asf/flink/blob/30c5b771/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index 1761b84..d2afa4d 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -31,20 +31,26 @@ import org.apache.flink.streaming.api.windowing.time.Time
  * <pre>
  * nc -l 12345
  * </pre>
- * and run this example with the port as an argument.
+ * and run this example with the hostname and the port as arguments..
  */
 object SocketWindowWordCount {
 
   /** Main program method */
   def main(args: Array[String]) : Unit = {
-    
-    // the port to connect to
-    val port: Int = try {
-      ParameterTool.fromArgs(args).getInt("port")
+
+    // the host and the port to connect to
+    var hostname: String = "localhost"
+    var port: Int = 0
+
+    try {
+      val params = ParameterTool.fromArgs(args)
+      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
+      port = params.getInt("port")
     } catch {
       case e: Exception => {
-        System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>', " +
-          "where port is the address of the text server")
+        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
+          "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " +
+          "is the address of the text server")
         System.err.println("To start a simple text server, run 'netcat -l <port>' " +
           "and type the input text into the command line")
         return
@@ -55,7 +61,7 @@ object SocketWindowWordCount {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     
     // get input data by connecting to the socket
-    val text = env.socketTextStream("localhost", port, '\n')
+    val text = env.socketTextStream(hostname, port, '\n')
 
     // parse the data, group it, window it, and aggregate the counts 
     val windowCounts = text