You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/20 11:15:57 UTC

[2/3] git commit: [YARN] properly set diagnostics messages on failures

[YARN] properly set diagnostics messages on failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae32c187
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae32c187
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae32c187

Branch: refs/heads/master
Commit: ae32c18769347022e456f93e4a84d907a2a465bd
Parents: 8594905
Author: Robert Metzger <me...@web.de>
Authored: Mon Jun 16 17:56:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 20 09:36:24 2014 +0200

----------------------------------------------------------------------
 docs/config.md                                  |   5 +
 .../apache/flink/yarn/ApplicationMaster.java    | 323 -----------
 .../main/java/org/apache/flink/yarn/Client.java | 268 ++++++---
 .../apache/flink/yarn/ClientMasterControl.java  | 128 +++++
 .../main/java/org/apache/flink/yarn/Utils.java  |   2 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  68 ---
 .../flink/yarn/appMaster/ApplicationMaster.java | 554 +++++++++++++++++++
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  68 +++
 .../flink/yarn/rpc/ApplicationMasterStatus.java |  93 ++++
 .../yarn/rpc/YARNClientMasterProtocol.java      |  63 +++
 .../client/minicluster/NepheleMiniCluster.java  |   2 +-
 .../flink/configuration/ConfigConstants.java    |   9 +
 .../instance/DefaultInstanceManager.java        |   2 +-
 .../flink/runtime/instance/InstanceManager.java |   4 +-
 .../flink/runtime/jobmanager/JobManager.java    |   4 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   2 +-
 .../jobmanager/web/LogfileInfoServlet.java      |  73 ++-
 .../runtime/jobmanager/web/WebInfoServer.java   |  51 +-
 .../scheduler/TestInstanceManager.java          |   2 +-
 tools/.gitignore                                |   1 +
 20 files changed, 1175 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index ddc579b..4a02b0f 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -249,3 +249,8 @@ sample that the compiler takes for delimited inputs. If the length of a single
 sample exceeds this value (possible because of misconfiguration of the parser),
 the sampling aborts. This value can be overridden for a specific input with the
 input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
+
+# YARN
+
+- `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
+let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
deleted file mode 100644
index 40635dc..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Writer;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.util.Records;
-
-import com.google.common.base.Preconditions;
-
-public class ApplicationMaster {
-
-	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-	
-	private void run() throws Exception  {
-		//Utils.logFilesInCurrentDirectory(LOG);
-		// Initialize clients to ResourceManager and NodeManagers
-		Configuration conf = Utils.initializeYarnConfiguration();
-		FileSystem fs = FileSystem.get(conf);
-		Map<String, String> envs = System.getenv();
-		final String currDir = envs.get(Environment.PWD.key());
-		final String logDirs =  envs.get(Environment.LOG_DIRS.key());
-		final String ownHostname = envs.get(Environment.NM_HOST.key());
-		final String appId = envs.get(Client.ENV_APP_ID);
-		final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
-		final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
-		final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
-		final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
-		final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
-		final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
-		
-		int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
-		
-		if(currDir == null) {
-			throw new RuntimeException("Current directory unknown");
-		}
-		if(ownHostname == null) {
-			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
-		}
-		LOG.info("Working directory "+currDir);
-		
-		// load Flink configuration.
-		Utils.getFlinkConfiguration(currDir);
-		
-		final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
-		
-		// Update yaml conf -> set jobManager address to this machine's address.
-		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
-		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
-		String line ;
-		while ( (line = br.readLine()) != null) {
-			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
-			} else {
-				output.append(line+"\n");
-			}
-		}
-		// just to make sure.
-		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
-		output.close();
-		br.close();
-		File newConf = new File(currDir+"/flink-conf-modified.yaml");
-		if(!newConf.exists()) {
-			LOG.warn("modified yaml does not exist!");
-		}
-		
-		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME, 
-				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-		
-		JobManager jm;
-		{
-			String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
-			String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
-			
-			// start the job manager
-			jm = JobManager.initialize( args );
-			
-			// Start info server for jobmanager
-			jm.startInfoServer();
-		}
-		
-		AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
-		rmClient.init(conf);
-		rmClient.start();
-
-		NMClient nmClient = NMClient.createNMClient();
-		nmClient.init(conf);
-		nmClient.start();
-
-		// Register with ResourceManager
-		LOG.info("registering ApplicationMaster");
-		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
-
-		// Priority for worker containers - priorities are intra-application
-		Priority priority = Records.newRecord(Priority.class);
-		priority.setPriority(0);
-
-		// Resource requirements for worker containers
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(memoryPerTaskManager);
-		capability.setVirtualCores(coresPerTaskManager);
-
-		// Make container requests to ResourceManager
-		for (int i = 0; i < taskManagerCount; ++i) {
-			ContainerRequest containerAsk = new ContainerRequest(capability,
-					null, null, priority);
-			LOG.info("Requesting TaskManager container " + i);
-			rmClient.addContainerRequest(containerAsk);
-		}
-		
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
-		// register Flink Jar with remote HDFS
-		final Path remoteJarPath = new Path(remoteFlinkJarPath);
-		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
-		
-		// register conf with local fs.
-		Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
-		LOG.info("Prepared localresource for modified yaml: "+flinkConf);
-		
-		
-		boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
-		// prepare the files to ship
-		LocalResource[] remoteShipRsc = null;
-		String[] remoteShipPaths = shipListString.split(",");
-		if(!shipListString.isEmpty()) {
-			remoteShipRsc = new LocalResource[remoteShipPaths.length]; 
-			{ // scope for i
-				int i = 0;
-				for(String remoteShipPathStr : remoteShipPaths) {
-					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
-						continue;
-					}
-					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
-					Path remoteShipPath = new Path(remoteShipPathStr);
-					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
-					i++;
-				}
-			}
-		}
-		
-		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-				
-		// Obtain allocated containers and launch
-		int allocatedContainers = 0;
-		int completedContainers = 0;
-		while (allocatedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(0);
-			for (Container container : response.getAllocatedContainers()) {
-				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
-				++allocatedContainers;
-
-				// Launch container by create ContainerLaunchContext
-				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-				
-				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
-				if(hasLog4j) {
-					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
-				}
-				tmCommand	+= " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
-						+ " 1>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stdout.log" 
-						+ " 2>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stderr.log";
-				ctx.setCommands(Collections.singletonList(tmCommand));
-				
-				LOG.info("Starting TM with command="+tmCommand);
-				
-				// copy resources to the TaskManagers.
-				Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-				localResources.put("flink.jar", flinkJar);
-				localResources.put("flink-conf.yaml", flinkConf);
-				
-				// add ship resources
-				if(!shipListString.isEmpty()) {
-					Preconditions.checkNotNull(remoteShipRsc);
-					for( int i = 0; i < remoteShipPaths.length; i++) {
-						localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
-					}
-				}
-				
-				
-				ctx.setLocalResources(localResources);
-				
-				// Setup CLASSPATH for Container (=TaskTracker)
-				Map<String, String> containerEnv = new HashMap<String, String>();
-				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
-				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
-				
-				ctx.setEnvironment(containerEnv);
-
-				UserGroupInformation user = UserGroupInformation.getCurrentUser();
-				try {
-					Credentials credentials = user.getCredentials();
-					DataOutputBuffer dob = new DataOutputBuffer();
-					credentials.writeTokenStorageToStream(dob);
-					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
-							0, dob.getLength());
-					ctx.setTokens(securityTokens);
-				} catch (IOException e) {
-					LOG.warn("Getting current user info failed when trying to launch the container"
-							+ e.getMessage());
-				}
-				
-				LOG.info("Launching container " + allocatedContainers);
-				nmClient.startContainer(container, ctx);
-			}
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(100);
-		}
-
-		// Now wait for containers to complete
-		
-		while (completedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(completedContainers
-					/ taskManagerCount);
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(5000);
-		}
-		LOG.info("Shutting down JobManager");
-		jm.shutdown();
-		
-		// Un-register with ResourceManager
-		rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
-		
-		
-	}
-	public static void main(String[] args) throws Exception {
-		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					new ApplicationMaster().run();
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 6d4c7b5..a2090f1 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -20,13 +20,16 @@
 
 package org.apache.flink.yarn;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,6 +50,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,22 +79,23 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+
 /**
  * All classes in this package contain code taken from
  * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
  * and
  * https://github.com/hortonworks/simple-yarn-app
- * and 
+ * and
  * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- * 
- * The Flink jar is uploaded to HDFS by this client. 
+ *
+ * The Flink jar is uploaded to HDFS by this client.
  * The application master and all the TaskManager containers get the jar file downloaded
  * by YARN into their local fs.
- * 
+ *
  */
 public class Client {
 	private static final Log LOG = LogFactory.getLog(Client.class);
-	
+
 	/**
 	 * Command Line argument options
 	 */
@@ -107,11 +112,11 @@ public class Client {
 	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
 	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
 			+ " TaskTrackers)");
-	
+
 	/**
 	 * Constants
 	 */
-	// environment variable names 
+	// environment variable names
 	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
 	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
 	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
@@ -120,15 +125,28 @@ public class Client {
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
 	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	
+	public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
+
 	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
 
-	
-	
+	/**
+	 * Seconds to wait between each status query to the AM.
+	 */
+	private static final int CLIENT_POLLING_INTERVALL = 3;
+
 	private Configuration conf;
+	private YarnClient yarnClient;
+
+	private ClientMasterControl cmc;
+
+	private ApplicationId appId;
+
+	private File addrFile;
+
+	private Path sessionFilesDir;
 
 	public void run(String[] args) throws Exception {
-		
+
 		if(UserGroupInformation.isSecurityEnabled()) {
 			throw new RuntimeException("Flink YARN client does not have security support right now."
 					+ "File a bug, we will fix it asap");
@@ -149,7 +167,7 @@ public class Client {
 		options.addOption(QUEUE);
 		options.addOption(QUERY);
 		options.addOption(SHIP_PATH);
-		
+
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd = null;
 		try {
@@ -159,7 +177,7 @@ public class Client {
 			printUsage();
 			System.exit(1);
 		}
-		
+
 		if (System.getProperty("log4j.configuration") == null) {
 			Logger root = Logger.getRootLogger();
 			root.removeAllAppenders();
@@ -173,8 +191,8 @@ public class Client {
 				root.setLevel(Level.INFO);
 			}
 		}
-		
-		
+
+
 		// Jar Path
 		Path localJarPath;
 		if(cmd.hasOption(FLINK_JAR.getOpt())) {
@@ -186,15 +204,15 @@ public class Client {
 		} else {
 			localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 		}
-		
+
 		if(cmd.hasOption(GEN_CONF.getOpt())) {
 			LOG.info("Placing default configuration in current directory");
 			File outFile = generateDefaultConf(localJarPath);
 			LOG.info("File written to "+outFile.getAbsolutePath());
 			System.exit(0);
 		}
-		
-		// Conf Path 
+
+		// Conf Path
 		Path confPath = null;
 		String confDirPath = "";
 		if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
@@ -207,7 +225,7 @@ public class Client {
 			confPath = new Path(confFile.getAbsolutePath());
 		} else {
 			System.out.println("No configuration file has been specified");
-			
+
 			// no configuration path given.
 			// -> see if there is one in the current directory
 			File currDir = new File(".");
@@ -229,7 +247,7 @@ public class Client {
 					System.exit(1);
 				} else if(candidates.length == 1) {
 					confPath = new Path(candidates[0].toURI());
-				} 
+				}
 			}
 		}
 		List<File> shipFiles = new ArrayList<File>();
@@ -257,25 +275,25 @@ public class Client {
 				hasLog4j = true;
 			}
 		}
-		
+
 		// queue
 		String queue = "default";
 		if(cmd.hasOption(QUEUE.getOpt())) {
 			queue = cmd.getOptionValue(QUEUE.getOpt());
 		}
-		
+
 		// JobManager Memory
 		int jmMemory = 512;
 		if(cmd.hasOption(JM_MEMORY.getOpt())) {
 			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
 		}
-		
+
 		// Task Managers memory
 		int tmMemory = 1024;
 		if(cmd.hasOption(TM_MEMORY.getOpt())) {
 			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
 		}
-		
+
 		// Task Managers vcores
 		int tmCores = 1;
 		if(cmd.hasOption(TM_CORES.getOpt())) {
@@ -288,24 +306,24 @@ public class Client {
 			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
 		}
 		conf = Utils.initializeYarnConfiguration();
-		
+
 		// intialize HDFS
 		LOG.info("Copy App Master jar from local filesystem and add to local environment");
-		// Copy the application master jar to the filesystem 
-		// Create a local resource to point to the destination jar path 
+		// Copy the application master jar to the filesystem
+		// Create a local resource to point to the destination jar path
 		final FileSystem fs = FileSystem.get(conf);
-		
+
 		if(fs.getScheme().startsWith("file")) {
 			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
 					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
 					+ "The Flink YARN client needs to store its files in a distributed file system");
 		}
-		
+
 		// Create yarnClient
 		final YarnClient yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();
-		
+
 		// Query cluster for metrics
 		if(cmd.hasOption(QUERY.getOpt())) {
 			showClusterMetrics(yarnClient);
@@ -316,10 +334,10 @@ public class Client {
 			yarnClient.stop();
 			System.exit(1);
 		}
-		
+
 		// TM Count
 		final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-		
+
 		System.out.println("Using values:");
 		System.out.println("\tContainer Count = "+taskManagerCount);
 		System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
@@ -364,31 +382,31 @@ public class Client {
 			yarnClient.stop();
 			System.exit(1);
 		}
-		
+
 		// respect custom JVM options in the YAML file
 		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-		
+
 		// Set up the container launch context for the application master
 		ContainerLaunchContext amContainer = Records
 				.newRecord(ContainerLaunchContext.class);
-		
+
 		String amCommand = "$JAVA_HOME/bin/java"
 					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
 		if(hasLog4j) {
 			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 		}
-		amCommand 	+= " org.apache.flink.yarn.ApplicationMaster" + " "
+		amCommand 	+= " org.apache.flink.yarn.appMaster.ApplicationMaster" + " "
 					+ " 1>"
 					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
 					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
 		amContainer.setCommands(Collections.singletonList(amCommand));
-		
+
 		System.err.println("amCommand="+amCommand);
-		
+
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
 		final ApplicationId appId = appContext.getApplicationId();
-		
+
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
@@ -397,8 +415,8 @@ public class Client {
 		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
 		localResources.put("flink.jar", appMasterJar);
 		localResources.put("flink-conf.yaml", flinkConf);
-		
-		
+
+
 		// setup security tokens (code from apache storm)
 		final Path[] paths = new Path[3 + shipFiles.size()];
 		StringBuffer envShipFileList = new StringBuffer();
@@ -410,7 +428,7 @@ public class Client {
 			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
 					shipLocalPath, shipResources, fs.getHomeDirectory());
 			localResources.put(shipFile.getName(), shipResources);
-			
+
 			envShipFileList.append(paths[3 + i]);
 			if(i+1 < shipFiles.size()) {
 				envShipFileList.append(',');
@@ -419,15 +437,16 @@ public class Client {
 
 		paths[0] = remotePathJar;
 		paths[1] = remotePathConf;
-		paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-		fs.setPermission(paths[2], permission); // set permission for path.
+		fs.setPermission(sessionFilesDir, permission); // set permission for path.
 		Utils.setTokensFor(amContainer, paths, this.conf);
-		
-		 
+
+
 		amContainer.setLocalResources(localResources);
 		fs.close();
 
+		int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
 		// Setup CLASSPATH for ApplicationMaster
 		Map<String, String> appMasterEnv = new HashMap<String, String>();
 		Utils.setupEnv(conf, appMasterEnv);
@@ -440,52 +459,35 @@ public class Client {
 		appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
 		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		
+		appMasterEnv.put(Client.ENV_AM_PRC_PORT, String.valueOf(amRPCPort));
+
 		amContainer.setEnvironment(appMasterEnv);
-		
+
 		// Set up resource type requirements for ApplicationMaster
 		Resource capability = Records.newRecord(Resource.class);
 		capability.setMemory(jmMemory);
 		capability.setVirtualCores(1);
-		
+
 		appContext.setApplicationName("Flink"); // application name
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
 		appContext.setQueue(queue);
-		
+
 		// file that we write into the conf/ dir containing the jobManager address.
-		final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
-		
-		Runtime.getRuntime().addShutdownHook(new Thread() {
-		@Override
-		public void run() {
-			try {
-				LOG.info("Killing the Flink-YARN application.");
-				yarnClient.killApplication(appId);
-				LOG.info("Deleting files in "+paths[2]);
-				FileSystem shutFS = FileSystem.get(conf);
-				shutFS.delete(paths[2], true); // delete conf and jar file.
-				shutFS.close();
-			} catch (Exception e) {
-				LOG.warn("Exception while killing the YARN application", e);
-			}
-			try {
-				addrFile.delete();
-			} catch (Exception e) {
-				LOG.warn("Exception while deleting the jobmanager address file", e);
-			}
-			LOG.info("YARN Client is shutting down");
-			yarnClient.stop();
-		}
-		});
-		
+		addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+
+
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
 		ApplicationReport appReport = yarnClient.getApplicationReport(appId);
 		YarnApplicationState appState = appReport.getYarnApplicationState();
 		boolean told = false;
 		char[] el = { '/', '|', '\\', '-'};
-		int i = 0; 
+		int i = 0;
+		int numTaskmanagers = 0;
+
+		BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+
 		while (appState != YarnApplicationState.FINISHED
 				&& appState != YarnApplicationState.KILLED
 				&& appState != YarnApplicationState.FAILED) {
@@ -493,11 +495,15 @@ public class Client {
 				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
 				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
 				// write jobmanager connect information
-				
 				PrintWriter out = new PrintWriter(addrFile);
 				out.println(appReport.getHost()+":"+jmPort);
 				out.close();
 				addrFile.setReadable(true, false); // readable for all.
+
+				// connect RPC service
+				cmc = new ClientMasterControl(new InetSocketAddress(appReport.getHost(), amRPCPort));
+				cmc.start();
+				Runtime.getRuntime().addShutdownHook(new ClientShutdownHook());
 				told = true;
 			}
 			if(!told) {
@@ -507,24 +513,113 @@ public class Client {
 				}
 				Thread.sleep(500); // wait for the application to switch to RUNNING
 			} else {
-				Thread.sleep(5000);
+				int newTmCount = cmc.getNumberOfTaskManagers();
+				if(numTaskmanagers != newTmCount) {
+					System.err.println("Number of connected TaskManagers changed to "+newTmCount+" slots available: "+cmc.getNumberOfAvailableSlots());
+					numTaskmanagers = newTmCount;
+				}
+				if(cmc.getFailedStatus()) {
+					System.err.println("The Application Master failed!\nMessages:\n");
+					for(Message m: cmc.getMessages() ) {
+						System.err.println("Message: "+m.text);
+					}
+					System.err.println("Requesting Application Master shutdown");
+					cmc.shutdownAM();
+					System.err.println("Application Master closed.");
+				}
+				for(Message m: cmc.getMessages() ) {
+					System.err.println("Message: "+m.text);
+				}
+
+				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
+				long startTime = System.currentTimeMillis();
+				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
+				        && !in.ready()) {
+					Thread.sleep(200);
+				}
+				if (in.ready()) {
+					String command = in.readLine();
+					evalCommand(command);
+				}
+
 			}
-			
+
 			appReport = yarnClient.getApplicationReport(appId);
 			appState = appReport.getYarnApplicationState();
 		}
 
 		LOG.info("Application " + appId + " finished with"
-				+ " state " + appState + " at " + appReport.getFinishTime());
+				+ " state " + appState + "and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
 		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
 			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
 		}
-		
+
+	}
+
+	private void printHelp() {
+		System.err.println("Available commands:\n"
+				+ "\t stop : Stop the YARN session\n"
+			//	+ "\t add n : Add n TaskManagers to the YARN session\n"
+			//	+ "\t remove n : Remove n TaskManagers to the YARN session\n"
+				+ "\t allmsg : Show all messages\n");
 	}
+	private void evalCommand(String command) {
+		if(command.equals("help")) {
+			printHelp();
+		} else if(command.equals("stop") || command.equals("quit") || command.equals("exit")) {
+			stopSession();
+			System.exit(0);
+		} else if(command.equals("allmsg")) {
+			System.err.println("All messages from the ApplicationMaster:");
+			for(Message m: cmc.getMessages() ) {
+				System.err.println("Message: "+m.text);
+			}
+		} else if(command.startsWith("add")) {
+			String nStr = command.replace("add", "").trim();
+			int n = Integer.valueOf(nStr);
+			System.err.println("Adding "+n+" TaskManagers to the session");
+			cmc.addTaskManagers(n);
+		} else {
+			System.err.println("Unknown command '"+command+"'");
+			printHelp();
+		}
+	}
+
+	private void stopSession() {
+		try {
+			LOG.info("Sending shutdown request to the Application Master");
+			cmc.shutdownAM();
+			yarnClient.killApplication(appId);
+			LOG.info("Deleting files in "+sessionFilesDir );
+			FileSystem shutFS = FileSystem.get(conf);
+			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+			shutFS.close();
+			cmc.close();
+		} catch (Exception e) {
+			LOG.warn("Exception while killing the YARN application", e);
+		}
+		try {
+			addrFile.delete();
+		} catch (Exception e) {
+			LOG.warn("Exception while deleting the JobManager address file", e);
+		}
+		LOG.info("YARN Client is shutting down");
+		yarnClient.stop();
+	}
+
+	public class ClientShutdownHook extends Thread {
+		@Override
+		public void run() {
+			stopSession();
+		}
+	}
+
 	private static class ClusterResourceDescription {
 		public int totalFreeMemory;
 		public int containerLimit;
 	}
+
 	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
 		ClusterResourceDescription crd = new ClusterResourceDescription();
 		crd.totalFreeMemory = 0;
@@ -549,13 +644,10 @@ public class Client {
 		Options req = new Options();
 		req.addOption(CONTAINER);
 		formatter.printHelp(" ", req);
-		
+
 		formatter.setSyntaxPrefix("   Optional");
 		Options opt = new Options();
 		opt.addOption(VERBOSE);
-	//	opt.addOption(GEN_CONF);
-	//	opt.addOption(STRATOSPHERE_CONF);
-	//	opt.addOption(STRATOSPHERE_JAR);
 		opt.addOption(JM_MEMORY);
 		opt.addOption(TM_MEMORY);
 		opt.addOption(TM_CORES);
@@ -604,10 +696,10 @@ public class Client {
 			System.exit(1);
 		}
 		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-		
+
 		if(confStream == null) {
 			LOG.warn("Given jar file does not contain yaml conf.");
-			confStream = this.getClass().getResourceAsStream("flink-conf.yaml"); 
+			confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
 			if(confStream == null) {
 				throw new RuntimeException("Unable to find flink-conf in jar file");
 			}
@@ -630,4 +722,6 @@ public class Client {
 		Client c = new Client();
 		c.run(args);
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
new file mode 100644
index 0000000..44633ea
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
+
+
+public class ClientMasterControl extends Thread {
+	private static final Log LOG = LogFactory.getLog(ClientMasterControl.class);
+
+	private InetSocketAddress applicationMasterAddress;
+
+	private ApplicationMasterStatus appMasterStatus;
+	private YARNClientMasterProtocol cmp;
+	private Object lock = new Object();
+	private List<Message> messages;
+	private boolean running = true;
+
+	public ClientMasterControl(InetSocketAddress applicationMasterAddress) {
+		super();
+		this.applicationMasterAddress = applicationMasterAddress;
+	}
+
+	@Override
+	public void run() {
+		try {
+			cmp = RPC.getProxy(YARNClientMasterProtocol.class, applicationMasterAddress, NetUtils.getSocketFactory());
+
+			while(running) {
+				synchronized (lock) {
+					appMasterStatus = cmp.getAppplicationMasterStatus();
+					if(messages != null && appMasterStatus != null &&
+							messages.size() != appMasterStatus.getMessageCount()) {
+						messages = cmp.getMessages();
+					}
+				}
+
+				try {
+					Thread.sleep(5000);
+				} catch (InterruptedException e) {
+					LOG.warn("Error while getting application status", e);
+				}
+			}
+			RPC.stopProxy(cmp);
+		} catch (IOException e) {
+			LOG.warn("Error while running RPC service", e);
+		}
+
+	}
+
+	public int getNumberOfTaskManagers() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return 0;
+			}
+			return appMasterStatus.getNumberOfTaskManagers();
+		}
+	}
+
+	public int getNumberOfAvailableSlots() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return 0;
+			}
+			return appMasterStatus.getNumberOfAvailableSlots();
+		}
+	}
+	
+	public boolean getFailedStatus() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return false;
+			}
+			return appMasterStatus.getFailed();
+		}
+	}
+
+	public boolean shutdownAM() {
+		try {
+			return cmp.shutdownAM().getValue();
+		} catch(Throwable e) {
+			LOG.warn("Error shutting down the application master", e);
+			return false;
+		}
+	}
+
+	public List<Message> getMessages() {
+		if(this.messages == null) {
+			return new ArrayList<Message>();
+		}
+		return this.messages;
+	}
+
+	public void close() {
+		running = false;
+	}
+
+	public void addTaskManagers(int n) {
+		cmp.addTaskManagers(n);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index b72b9bc..bd2cb9e 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -106,7 +106,7 @@ public class Utils {
 	 * 
 	 */
 	public static int calculateHeapSize(int memory) {
-		int heapLimit = (int)((float)memory*0.85);
+		int heapLimit = (int)((float)memory*0.80);
 		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
 			heapLimit = memory-HEAP_LIMIT_CAP;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index b541317..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
-public class YarnTaskManagerRunner {
-	
-	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
-	
-	public static void main(final String[] args) throws IOException {
-		Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-		
-		// configure local directory
-		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
-		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
-		newArgs[newArgs.length-1] = localDirs;
-		LOG.info("Setting log path "+localDirs);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.main(newArgs);
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
new file mode 100644
index 0000000..2186fca
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.ipc.RPC.Server;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.util.SerializableArrayList;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.Client;
+import org.apache.flink.yarn.Utils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+
+public class ApplicationMaster implements YARNClientMasterProtocol {
+
+	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+	private final String currDir;
+	private final String logDirs;
+	private final String ownHostname;
+	private final String appId;
+	private final String clientHomeDir;
+	private final String applicationMasterHost;
+	private final String remoteFlinkJarPath;
+	private final String shipListString;
+	private final String yarnClientUsername;
+	private final String rpcPort;
+	private final int taskManagerCount;
+	private final int memoryPerTaskManager;
+	private final int coresPerTaskManager;
+	private final String localWebInterfaceDir;
+	private final Configuration conf;
+
+	/**
+	 * File system for interacting with Flink's files such as the jar
+	 * and the configuration.
+	 */
+	private FileSystem fs;
+	
+	/**
+	 * The JobManager that is running in the same JVM as this Application Master.
+	 */
+	private JobManager jobManager;
+
+	/**
+	 * RPC server for letting the YARN client connect to this AM.
+	 * This RPC connection is handling application specific requests.
+	 */
+	private final Server amRpcServer;
+
+	/**
+	 * RPC connecton of the AppMaster to the Resource Manager (YARN master)
+	 */
+	private AMRMClient<ContainerRequest> rmClient;
+
+	/**
+	 * RPC connection to the Node Manager.
+	 */
+	private NMClient nmClient;
+
+	/**
+	 * Messages of the AM that the YARN client is showing the user in the YARN session
+	 */
+	private List<Message> messages = new SerializableArrayList<Message>();
+
+	/**
+	 * Indicates if a log4j config file is being shipped.
+	 */
+	private boolean hasLog4j;
+	
+	/**
+	 * Heap size of TaskManager containers in MB.
+	 */
+	private int heapLimit;
+
+	/**
+	 * Number of containers that stopped running
+	 */
+	private int completedContainers = 0;
+
+	/**
+	 * Local resources used by all Task Manager containers.
+	 */
+	Map<String, LocalResource> taskManagerLocalResources;
+
+	/**
+	 * Flag indicating if the YARN session has failed.
+	 * A session failed if all containers stopped or an error occurred.
+	 * The ApplicationMaster will not close the RPC connection if it has failed (so
+	 * that the client can still retrieve the messages and then shut it down)
+	 */
+	private Boolean isFailed = false;
+
+	public ApplicationMaster(Configuration conf) throws IOException {
+		fs = FileSystem.get(conf);
+		Map<String, String> envs = System.getenv();
+		currDir = envs.get(Environment.PWD.key());
+		logDirs =  envs.get(Environment.LOG_DIRS.key());
+		ownHostname = envs.get(Environment.NM_HOST.key());
+		appId = envs.get(Client.ENV_APP_ID);
+		clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
+		applicationMasterHost = envs.get(Environment.NM_HOST.key());
+		remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
+		shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
+		yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		rpcPort = envs.get(Client.ENV_AM_PRC_PORT);
+		taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
+		memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
+		coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+		localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
+		this.conf = conf;
+
+		if(currDir == null) {
+			throw new RuntimeException("Current directory unknown");
+		}
+		if(ownHostname == null) {
+			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
+		}
+		LOG.info("Working directory "+currDir);
+
+		// load Flink configuration.
+		Utils.getFlinkConfiguration(currDir);
+
+		// start AM RPC service
+		amRpcServer = RPC.getServer(this, ownHostname, Integer.valueOf(rpcPort), 2);
+		amRpcServer.start();
+	}
+	
+	private void setFailed(boolean failed) {
+		this.isFailed = failed;
+	}
+
+	private void generateConfigurationFile() throws IOException {
+		// Update yaml conf -> set jobManager address to this machine's address.
+		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
+		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
+		String line ;
+		while ( (line = br.readLine()) != null) {
+			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
+			} else {
+				output.append(line+"\n");
+			}
+		}
+		// just to make sure.
+		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+		output.close();
+		br.close();
+		File newConf = new File(currDir+"/flink-conf-modified.yaml");
+		if(!newConf.exists()) {
+			LOG.warn("modified yaml does not exist!");
+		}
+	}
+
+	private void startJobManager() throws Exception {
+		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME,
+				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+		String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
+		String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
+
+		// start the job manager
+		jobManager = JobManager.initialize( args );
+
+		// Start info server for jobmanager
+		jobManager.startInfoServer();
+	}
+
+	private void setRMClient(AMRMClient<ContainerRequest> rmClient) {
+		this.rmClient = rmClient;
+	}
+
+	private void run() throws Exception  {
+		heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
+
+		nmClient = NMClient.createNMClient();
+		nmClient.init(conf);
+		nmClient.start();
+		nmClient.cleanupRunningContainersOnStop(true);
+
+		// Register with ResourceManager
+		LOG.info("Registering ApplicationMaster");
+		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+
+		// Priority for worker containers - priorities are intra-application
+		Priority priority = Records.newRecord(Priority.class);
+		priority.setPriority(0);
+
+		// Resource requirements for worker containers
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(memoryPerTaskManager);
+		capability.setVirtualCores(coresPerTaskManager);
+
+		// Make container requests to ResourceManager
+		for (int i = 0; i < taskManagerCount; ++i) {
+			ContainerRequest containerAsk = new ContainerRequest(capability,
+					null, null, priority);
+			LOG.info("Requesting TaskManager container " + i);
+			rmClient.addContainerRequest(containerAsk);
+		}
+
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+		// register Flink Jar with remote HDFS
+		final Path remoteJarPath = new Path(remoteFlinkJarPath);
+		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
+
+		// register conf with local fs.
+		Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
+		LOG.info("Prepared local resource for modified yaml: "+flinkConf);
+
+
+		hasLog4j = new File(currDir+"/log4j.properties").exists();
+		// prepare the files to ship
+		LocalResource[] remoteShipRsc = null;
+		String[] remoteShipPaths = shipListString.split(",");
+		if(!shipListString.isEmpty()) {
+			remoteShipRsc = new LocalResource[remoteShipPaths.length];
+			{ // scope for i
+				int i = 0;
+				for(String remoteShipPathStr : remoteShipPaths) {
+					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
+						continue;
+					}
+					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
+					Path remoteShipPath = new Path(remoteShipPathStr);
+					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
+					i++;
+				}
+			}
+		}
+		// copy resources to the TaskManagers.
+		taskManagerLocalResources = new HashMap<String, LocalResource>(2);
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		// add ship resources
+		if(!shipListString.isEmpty()) {
+			Preconditions.checkNotNull(remoteShipRsc);
+			for( int i = 0; i < remoteShipPaths.length; i++) {
+				taskManagerLocalResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
+			}
+		}
+		completedContainers = 0;
+		
+		// Obtain allocated containers and launch
+		StringBuffer containerDiag = new StringBuffer(); // diagnostics log for the containers.
+		allocateOutstandingContainer(containerDiag);
+		LOG.info("Allocated all initial containers");
+
+		// Now wait for containers to complete
+		while (completedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(completedContainers
+					/ taskManagerCount);
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+				logDeadContainer(status, containerDiag);
+			}
+			Thread.sleep(5000);
+		}
+		LOG.info("Shutting down JobManager");
+		jobManager.shutdown();
+
+		// Un-register with ResourceManager
+		final String diagnosticsMessage = "Application Master shut down after all "
+				+ "containers finished\n"+containerDiag.toString();
+		LOG.info("Diagnostics message: "+diagnosticsMessage);
+		rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnosticsMessage, "");
+		this.close();
+		LOG.info("Application Master shutdown completed.");
+	}
+
+	/**
+	 * Run a Thread to allocate new containers until taskManagerCount
+	 * is correct again.
+	 */
+	private void allocateOutstandingContainer(StringBuffer containerDiag) throws Exception {
+
+		// respect custom JVM options in the YAML file
+		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+		int allocatedContainers = 0;
+		while (allocatedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(0);
+			for (Container container : response.getAllocatedContainers()) {
+				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
+				++allocatedContainers;
+
+				// Launch container by create ContainerLaunchContext
+				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
+				if(hasLog4j) {
+					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+				}
+				tmCommand	+= " org.apache.flink.appMaster.YarnTaskManagerRunner -configDir . "
+						+ " 1>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stdout.log" 
+						+ " 2>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stderr.log";
+				ctx.setCommands(Collections.singletonList(tmCommand));
+
+				LOG.info("Starting TM with command="+tmCommand);
+
+				ctx.setLocalResources(taskManagerLocalResources);
+
+				// Setup CLASSPATH for Container (=TaskTracker)
+				Map<String, String> containerEnv = new HashMap<String, String>();
+				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
+				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
+
+				ctx.setEnvironment(containerEnv);
+
+				UserGroupInformation user = UserGroupInformation.getCurrentUser();
+				try {
+					Credentials credentials = user.getCredentials();
+					DataOutputBuffer dob = new DataOutputBuffer();
+					credentials.writeTokenStorageToStream(dob);
+					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+							0, dob.getLength());
+					ctx.setTokens(securityTokens);
+				} catch (IOException e) {
+					LOG.warn("Getting current user info failed when trying to launch the container", e);
+				}
+
+				LOG.info("Launching container " + allocatedContainers);
+				nmClient.startContainer(container, ctx);
+				messages.add(new Message("Launching new container"));
+			}
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+				// status.
+				logDeadContainer(status, containerDiag);
+			}
+			Thread.sleep(100);
+		}
+	}
+
+	private void logDeadContainer(ContainerStatus status,
+			StringBuffer containerDiag) {
+		String msg = "Diagnostics for containerId="+status.getContainerId()+
+				" in state="+status.getState()+"\n"+status.getDiagnostics();
+		messages.add(new Message(msg) );
+		containerDiag.append("\n\n");
+		containerDiag.append(msg);
+	}
+	
+	@Override
+	public ApplicationMasterStatus getAppplicationMasterStatus() {
+		ApplicationMasterStatus amStatus;
+		if(jobManager == null) {
+			// JM not yet started
+			amStatus = new ApplicationMasterStatus(0, 0 );
+		} else {
+			amStatus = new ApplicationMasterStatus(jobManager.getNumberOfTaskManagers(), jobManager.getAvailableSlots() );
+		}
+		amStatus.setMessageCount(messages.size());
+		amStatus.setFailed(isFailed);
+		return amStatus;
+	}
+	
+
+	@Override
+	public BooleanValue shutdownAM() throws Exception {
+		LOG.info("Client requested shutdown of AM");
+		FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+		String finalMessage = "";
+		if(isFailed) {
+			finalStatus = FinalApplicationStatus.FAILED;
+			finalMessage = "Application Master failed";
+			isFailed = false; // allow a proper shutdown
+			isFailed.notifyAll();
+		}
+		rmClient.unregisterApplicationMaster(finalStatus, finalMessage, "");
+		this.close();
+		return new BooleanValue(true);
+	}
+	
+	private void close() throws Exception {
+		nmClient.close();
+		rmClient.close();
+		if(!isFailed) {
+			LOG.warn("Can not close AM RPC connection since this the AM is in failed state");
+			amRpcServer.stop();
+		}
+	}
+
+	@Override
+	public List<Message> getMessages() {
+		return messages;
+	}
+
+	public void addMessage(Message msg) {
+		messages.add(msg);
+	}
+
+	@Override
+	public void addTaskManagers(int n) {
+		throw new RuntimeException("Implement me");
+	}
+
+	/**
+	 * Keeps the ApplicationMaster JVM with the Client RPC service running
+	 * to allow it retrieving the error message.
+	 */
+	protected void keepRPCAlive() {
+		synchronized (isFailed) {
+			while(true) {
+				if(isFailed) {
+					try {
+						isFailed.wait(100);
+					} catch (InterruptedException e) {
+						LOG.warn("Error while waiting until end of failed mode of AM", e);
+					}
+				} else {
+					// end of isFailed mode.
+					break;
+				}
+			}
+		}
+	}
+	
+	public static void main(String[] args) throws Exception {
+		// execute Application Master using the client's user
+		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				AMRMClient<ContainerRequest> rmClient = null;
+				ApplicationMaster am = null;
+				try {
+					Configuration conf = Utils.initializeYarnConfiguration();
+					rmClient = AMRMClient.createAMRMClient();
+					rmClient.init(conf);
+					rmClient.start();
+
+					// run the actual Application Master
+					am = new ApplicationMaster(conf);
+					am.generateConfigurationFile();
+					am.startJobManager();
+					am.setRMClient(rmClient);
+					am.run();
+				} catch (Throwable e) {
+					LOG.fatal("Error while running the application master", e);
+					// the AM is not available. Report error through the unregister function.
+					if(rmClient != null && am == null) {
+						try {
+							rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "Flink YARN Application master"
+									+ " stopped unexpectedly with an exception.\n"
+									+ StringUtils.stringifyException(e), "");
+						} catch (Exception e1) {
+							LOG.fatal("Unable to fail the application master", e1);
+						}
+						LOG.info("AM unregistered from RM");
+						return null;
+					}
+					if(rmClient == null) {
+						LOG.fatal("Unable to unregister AM since the RM client is not available");
+					}
+					if(am != null) {
+						LOG.info("Writing error into internal message system");
+						am.setFailed(true);
+						am.addMessage(new Message("The application master failed with an exception:\n"
+								+ StringUtils.stringifyException(e)));
+						am.keepRPCAlive();
+					}
+				}
+				return null;
+			}
+		});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..0b0b1e4
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.yarn.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
+
+public class YarnTaskManagerRunner {
+
+	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
+
+	public static void main(final String[] args) throws IOException {
+		Map<String, String> envs = System.getenv();
+		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+
+		// configure local directory
+		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
+		newArgs[newArgs.length-1] = localDirs;
+		LOG.info("Setting log path "+localDirs);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					TaskManager.main(newArgs);
+				} catch (Exception e) {
+					LOG.fatal("Error while running the TaskManager", e);
+				}
+				return null;
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
new file mode 100644
index 0000000..b2bdf6b
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Class holding status information about the ApplicatioMaster.
+ * The client is requesting the AM status regularly from the AM.
+ */
+public class ApplicationMasterStatus implements IOReadableWritable {
+	private int numTaskManagers = 0;
+	private int numSlots = 0;
+	private int messageCount = 0;
+	private boolean failed = false;
+
+
+	public ApplicationMasterStatus() {
+		// for instantiation
+	}
+	
+	public ApplicationMasterStatus(int numTaskManagers, int numSlots) {
+		this.numTaskManagers = numTaskManagers;
+		this.numSlots = numSlots;
+	}
+
+	public ApplicationMasterStatus(int numTaskManagers, int numSlots,
+			int messageCount, boolean failed) {
+		this(numTaskManagers, numSlots);
+		this.messageCount = messageCount;
+		this.failed = failed;
+	}
+	
+
+	public int getNumberOfTaskManagers() {
+		return numTaskManagers;
+	}
+
+	public int getNumberOfAvailableSlots() {
+		return numSlots;
+	}
+
+	public int getMessageCount() {
+		return messageCount;
+	}
+	
+	public void setMessageCount(int messageCount) {
+		this.messageCount = messageCount;
+	}
+	
+	public void setFailed(Boolean isFailed) {
+		this.failed = isFailed;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(numTaskManagers);
+		out.writeInt(numSlots);
+		out.writeInt(messageCount);
+		out.writeBoolean(failed);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		numTaskManagers = in.readInt();
+		numSlots = in.readInt();
+		messageCount = in.readInt();
+		failed = in.readBoolean();
+	}
+
+	public boolean getFailed() {
+		return failed;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
new file mode 100644
index 0000000..560147c
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.types.BooleanValue;
+
+
+/**
+ * Interface describing the methods offered by the RPC service between
+ * the Client and Application Master
+ */
+public interface YARNClientMasterProtocol extends VersionedProtocol {
+
+	public static class Message implements IOReadableWritable {
+		public String text;
+
+		public Message(String msg) {
+			this.text = msg;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeUTF(text);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			text = in.readUTF();
+		}
+	}
+
+	ApplicationMasterStatus getAppplicationMasterStatus();
+
+	BooleanValue shutdownAM() throws Exception;
+
+	List<Message> getMessages();
+
+	void addTaskManagers(int n);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index d614cc2..454c543 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -245,7 +245,7 @@ public class NepheleMiniCluster {
 	// ------------------------------------------------------------------------
 	
 	private void waitForJobManagerToBecomeReady(int numTaskManagers) throws InterruptedException {
-		while (jobManager.getNumberOfTaskTrackers() < numTaskManagers) {
+		while (jobManager.getNumberOfTaskManagers() < numTaskManagers) {
 			Thread.sleep(50);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 108db51..2983c63 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -267,6 +267,10 @@ public final class ConfigConstants {
 	 */
 	public static final String WEB_ACCESS_FILE_KEY = "webclient.access";
 	
+	// ----------------------------- YARN Client ----------------------------
+	
+	public static final String YARN_AM_PRC_PORT = "yarn.am.rpc.port";
+	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
@@ -475,6 +479,11 @@ public final class ConfigConstants {
 	 * The default path to the file containing the list of access privileged users and passwords.
 	 */
 	public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;
+	
+	// ----------------------------- YARN ----------------------------
+	
+	public static final int DEFAULT_YARN_AM_RPC_PORT = 10245;
+	
 
 	// ----------------------------- LocalExecution ----------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index 24d2fbc..5c65131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -382,7 +382,7 @@ public class DefaultInstanceManager implements InstanceManager {
 	}
 
 	@Override
-	public int getNumberOfTaskTrackers() {
+	public int getNumberOfTaskManagers() {
 		return this.registeredHosts.size();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index eed8b51..e8f8cbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -46,9 +46,9 @@ public interface InstanceManager {
 
 	Instance getInstanceByName(String name);
 
-	int getNumberOfTaskTrackers();
+	int getNumberOfTaskManagers();
 
 	int getNumberOfSlots();
-	
+
 	Map<InstanceConnectionInfo, Instance> getInstances();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 8ef6b58..82f663c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -1180,8 +1180,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return this.archive;
 	}
 
-	public int getNumberOfTaskTrackers() {
-		return this.instanceManager.getNumberOfTaskTrackers();
+	public int getNumberOfTaskManagers() {
+		return this.instanceManager.getNumberOfTaskManagers();
 	}
 	
 	public Map<InstanceConnectionInfo, Instance> getInstances() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 386315f..3ffe5af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskTrackers() +"}");
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
index d28ea2c..5b09704 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.web;
 
 import java.io.File;
@@ -31,73 +30,71 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.flink.util.StringUtils;
 
+import com.google.common.base.Preconditions;
+
 public class LogfileInfoServlet extends HttpServlet {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	/**
 	 * The log for this class.
 	 */
 	private static final Log LOG = LogFactory.getLog(LogfileInfoServlet.class);
-	
-	private File logDir;
-	
-	public LogfileInfoServlet(File logDir) {
-		this.logDir = logDir;
+
+	private File[] logDirs;
+
+
+	public LogfileInfoServlet(File[] logDirs) {
+		Preconditions.checkNotNull(logDirs, "The given log files are null.");
+		this.logDirs = logDirs;
 	}
-	
+
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 
 		try {
 			if("stdout".equals(req.getParameter("get"))) {
-				// Find current stdtout file
-				for(File f : logDir.listFiles()) {
-					// contains "jobmanager" ".log" and no number in the end ->needs improvement
-					if( f.getName().equals("jobmanager-stdout.log") ||
-							(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))
-							) {
-						
-						resp.setStatus(HttpServletResponse.SC_OK);
-						resp.setContentType("text/plain ");
-						writeFile(resp.getOutputStream(), f);
-						break;
-					}
-				}
+				// Find current stdout file
+				sendFile("jobmanager-stdout.log", resp);
 			}
 			else {
 				// Find current logfile
-				for(File f : logDir.listFiles()) {
-					// contains "jobmanager" ".log" and no number in the end ->needs improvement
-					if( f.getName().equals("jobmanager-stderr.log") ||
-							(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))) {
-						
-						resp.setStatus(HttpServletResponse.SC_OK);
-						resp.setContentType("text/plain ");
-						writeFile(resp.getOutputStream(), f);
-						break;
-					}
-					
-				}
+				sendFile("jobmanager-log4j.log", resp);
 			}
 		} catch (Throwable t) {
 			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(t.getMessage());
+			resp.getWriter().print("Error opening log files':"+t.getMessage());
 			if (LOG.isWarnEnabled()) {
 				LOG.warn(StringUtils.stringifyException(t));
 			}
 		}
 	}
-	
+
+	private void sendFile(String fileName, HttpServletResponse resp) throws IOException {
+		for(File logDir: logDirs) {
+			for(File f : logDir.listFiles()) {
+				// contains "jobmanager" ".log" and no number in the end ->needs improvement
+				if( f.getName().equals(fileName) /*||
+						(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) */
+						) {
+
+					resp.setStatus(HttpServletResponse.SC_OK);
+					resp.setContentType("text/plain");
+					writeFile(resp.getOutputStream(), f);
+				}
+			}
+		}
+	}
 	private static void writeFile(OutputStream out, File file) throws IOException {
 		byte[] buf = new byte[4 * 1024]; // 4K buffer
-		
+
 		FileInputStream  is = null;
 		try {
 			is = new FileInputStream(file);
-			
+			out.write(("==== FILE: "+file.toString()+" ====\n").getBytes());
 			int bytesRead;
 			while ((bytesRead = is.read(buf)) != -1) {
 				out.write(buf, 0, bytesRead);
@@ -108,5 +105,5 @@ public class LogfileInfoServlet extends HttpServlet {
 			}
 		}
 	}
-	
+
 }