You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/08/20 11:15:56 UTC

[1/3] [YARN] properly set diagnostics messages on failures

Repository: incubator-flink
Updated Branches:
  refs/heads/master 859490533 -> e8f2e9d0e


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 6c3b2b9..7856652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -40,33 +40,34 @@ import org.eclipse.jetty.server.handler.ResourceHandler;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
+
 /**
  * This class sets up a web-server that contains a web frontend to display information about running jobs.
  * It instantiates and configures an embedded jetty server.
  */
 public class WebInfoServer {
-	
+
 	/**
 	 * The log for this class.
 	 */
 	private static final Log LOG = LogFactory.getLog(WebInfoServer.class);
-	
+
 	/**
 	 * The jetty server serving all requests.
 	 */
 	private final Server server;
-	
+
 	/**
 	 * Port for info server
 	 */
 	private int port;
-	
+
 	/**
 	 * Creates a new web info server. The server runs the servlets that implement the logic
-	 * to list all present information concerning the job manager 
-	 * 
+	 * to list all present information concerning the job manager
+	 *
 	 * @param nepheleConfig
-	 *        The configuration for the nephele job manager. 
+	 *        The configuration for the nephele job manager.
 	 * @param port
 	 *        The port to launch the server on.
 	 * @throws IOException
@@ -74,18 +75,24 @@ public class WebInfoServer {
 	 */
 	public WebInfoServer(Configuration nepheleConfig, int port, JobManager jobmanager) throws IOException {
 		this.port = port;
-		
+
 		// if no explicit configuration is given, use the global configuration
 		if (nepheleConfig == null) {
 			nepheleConfig = GlobalConfiguration.getConfiguration();
 		}
-		
+
 		// get base path of Flink installation
-		String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
-		String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
-		String logDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, 
-				basePath+"/log");
-		
+		final String basePath = nepheleConfig.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
+		final String webDirPath = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ROOT_PATH);
+		final String[] logDirPaths = nepheleConfig.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY,
+				basePath+"/log").split(","); // YARN allows to specify multiple log directories
+
+		final File[] logDirFiles = new File[logDirPaths.length];
+		int i = 0;
+		for(String path : logDirPaths) {
+			logDirFiles[i++] = new File(path);
+		}
+
 		File webDir;
 		if(webDirPath.startsWith("/")) {
 			// absolute path
@@ -94,12 +101,12 @@ public class WebInfoServer {
 			// path relative to base dir
 			webDir = new File(basePath+"/"+webDirPath);
 		}
-		
-		
+
+
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Setting up web info server, using web-root directory '" + webDir.getAbsolutePath() + "'.");
 			//LOG.info("Web info server will store temporary files in '" + tmpDir.getAbsolutePath());
-	
+
 			LOG.info("Web info server will display information about nephele job-manager on "
 				+ nepheleConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port "
 				+ port
@@ -108,17 +115,17 @@ public class WebInfoServer {
 
 		// ensure that the directory with the web documents exists
 		if (!webDir.exists()) {
-			throw new FileNotFoundException("Cannot start jobmanager web info server. The directory containing the web documents does not exist: " 
+			throw new FileNotFoundException("Cannot start jobmanager web info server. The directory containing the web documents does not exist: "
 				+ webDir.getAbsolutePath());
 		}
-		
+
 		server = new Server(port);
 
 		// ----- the handlers for the servlets -----
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
 		servletContext.addServlet(new ServletHolder(new JobmanagerInfoServlet(jobmanager)), "/jobsInfo");
-		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(new File(logDirPath))), "/logInfo");
+		servletContext.addServlet(new ServletHolder(new LogfileInfoServlet(logDirFiles)), "/logInfo");
 		servletContext.addServlet(new ServletHolder(new SetupInfoServlet(jobmanager)), "/setupInfo");
 		servletContext.addServlet(new ServletHolder(new MenuServlet()), "/menu");
 
@@ -172,10 +179,10 @@ public class WebInfoServer {
 			server.setHandler(handlers);
 		}
 	}
-	
+
 	/**
 	 * Starts the web frontend server.
-	 * 
+	 *
 	 * @throws Exception
 	 *         Thrown, if the start fails.
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
index f87a1dd..9286def 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
@@ -177,7 +177,7 @@ public final class TestInstanceManager implements InstanceManager {
 	}
 
 	@Override
-	public int getNumberOfTaskTrackers() {
+	public int getNumberOfTaskManagers() {
 		throw new IllegalStateException("getNumberOfTaskTrackers called on TestInstanceManager");
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/tools/.gitignore
----------------------------------------------------------------------
diff --git a/tools/.gitignore b/tools/.gitignore
new file mode 100644
index 0000000..2546bad
--- /dev/null
+++ b/tools/.gitignore
@@ -0,0 +1 @@
+merge_pull_request.sh
\ No newline at end of file


[2/3] git commit: [YARN] properly set diagnostics messages on failures

Posted by rm...@apache.org.
[YARN] properly set diagnostics messages on failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae32c187
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae32c187
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae32c187

Branch: refs/heads/master
Commit: ae32c18769347022e456f93e4a84d907a2a465bd
Parents: 8594905
Author: Robert Metzger <me...@web.de>
Authored: Mon Jun 16 17:56:59 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 20 09:36:24 2014 +0200

----------------------------------------------------------------------
 docs/config.md                                  |   5 +
 .../apache/flink/yarn/ApplicationMaster.java    | 323 -----------
 .../main/java/org/apache/flink/yarn/Client.java | 268 ++++++---
 .../apache/flink/yarn/ClientMasterControl.java  | 128 +++++
 .../main/java/org/apache/flink/yarn/Utils.java  |   2 +-
 .../flink/yarn/YarnTaskManagerRunner.java       |  68 ---
 .../flink/yarn/appMaster/ApplicationMaster.java | 554 +++++++++++++++++++
 .../yarn/appMaster/YarnTaskManagerRunner.java   |  68 +++
 .../flink/yarn/rpc/ApplicationMasterStatus.java |  93 ++++
 .../yarn/rpc/YARNClientMasterProtocol.java      |  63 +++
 .../client/minicluster/NepheleMiniCluster.java  |   2 +-
 .../flink/configuration/ConfigConstants.java    |   9 +
 .../instance/DefaultInstanceManager.java        |   2 +-
 .../flink/runtime/instance/InstanceManager.java |   4 +-
 .../flink/runtime/jobmanager/JobManager.java    |   4 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   2 +-
 .../jobmanager/web/LogfileInfoServlet.java      |  73 ++-
 .../runtime/jobmanager/web/WebInfoServer.java   |  51 +-
 .../scheduler/TestInstanceManager.java          |   2 +-
 tools/.gitignore                                |   1 +
 20 files changed, 1175 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index ddc579b..4a02b0f 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -249,3 +249,8 @@ sample that the compiler takes for delimited inputs. If the length of a single
 sample exceeds this value (possible because of misconfiguration of the parser),
 the sampling aborts. This value can be overridden for a specific input with the
 input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
+
+# YARN
+
+- `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
+let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
deleted file mode 100644
index 40635dc..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Writer;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.NMClient;
-import org.apache.hadoop.yarn.util.Records;
-
-import com.google.common.base.Preconditions;
-
-public class ApplicationMaster {
-
-	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-	
-	private void run() throws Exception  {
-		//Utils.logFilesInCurrentDirectory(LOG);
-		// Initialize clients to ResourceManager and NodeManagers
-		Configuration conf = Utils.initializeYarnConfiguration();
-		FileSystem fs = FileSystem.get(conf);
-		Map<String, String> envs = System.getenv();
-		final String currDir = envs.get(Environment.PWD.key());
-		final String logDirs =  envs.get(Environment.LOG_DIRS.key());
-		final String ownHostname = envs.get(Environment.NM_HOST.key());
-		final String appId = envs.get(Client.ENV_APP_ID);
-		final String clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
-		final String applicationMasterHost = envs.get(Environment.NM_HOST.key());
-		final String remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
-		final String shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final int taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
-		final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
-		final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
-		
-		int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
-		
-		if(currDir == null) {
-			throw new RuntimeException("Current directory unknown");
-		}
-		if(ownHostname == null) {
-			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
-		}
-		LOG.info("Working directory "+currDir);
-		
-		// load Flink configuration.
-		Utils.getFlinkConfiguration(currDir);
-		
-		final String localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
-		
-		// Update yaml conf -> set jobManager address to this machine's address.
-		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
-		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
-		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
-		String line ;
-		while ( (line = br.readLine()) != null) {
-			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
-				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
-			} else {
-				output.append(line+"\n");
-			}
-		}
-		// just to make sure.
-		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
-		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
-		output.close();
-		br.close();
-		File newConf = new File(currDir+"/flink-conf-modified.yaml");
-		if(!newConf.exists()) {
-			LOG.warn("modified yaml does not exist!");
-		}
-		
-		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME, 
-				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
-		
-		JobManager jm;
-		{
-			String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
-			String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
-			
-			// start the job manager
-			jm = JobManager.initialize( args );
-			
-			// Start info server for jobmanager
-			jm.startInfoServer();
-		}
-		
-		AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
-		rmClient.init(conf);
-		rmClient.start();
-
-		NMClient nmClient = NMClient.createNMClient();
-		nmClient.init(conf);
-		nmClient.start();
-
-		// Register with ResourceManager
-		LOG.info("registering ApplicationMaster");
-		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
-
-		// Priority for worker containers - priorities are intra-application
-		Priority priority = Records.newRecord(Priority.class);
-		priority.setPriority(0);
-
-		// Resource requirements for worker containers
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(memoryPerTaskManager);
-		capability.setVirtualCores(coresPerTaskManager);
-
-		// Make container requests to ResourceManager
-		for (int i = 0; i < taskManagerCount; ++i) {
-			ContainerRequest containerAsk = new ContainerRequest(capability,
-					null, null, priority);
-			LOG.info("Requesting TaskManager container " + i);
-			rmClient.addContainerRequest(containerAsk);
-		}
-		
-		LocalResource flinkJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-
-		// register Flink Jar with remote HDFS
-		final Path remoteJarPath = new Path(remoteFlinkJarPath);
-		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
-		
-		// register conf with local fs.
-		Path remoteConfPath = Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
-		LOG.info("Prepared localresource for modified yaml: "+flinkConf);
-		
-		
-		boolean hasLog4j = new File(currDir+"/log4j.properties").exists();
-		// prepare the files to ship
-		LocalResource[] remoteShipRsc = null;
-		String[] remoteShipPaths = shipListString.split(",");
-		if(!shipListString.isEmpty()) {
-			remoteShipRsc = new LocalResource[remoteShipPaths.length]; 
-			{ // scope for i
-				int i = 0;
-				for(String remoteShipPathStr : remoteShipPaths) {
-					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
-						continue;
-					}
-					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
-					Path remoteShipPath = new Path(remoteShipPathStr);
-					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
-					i++;
-				}
-			}
-		}
-		
-		// respect custom JVM options in the YAML file
-		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-				
-		// Obtain allocated containers and launch
-		int allocatedContainers = 0;
-		int completedContainers = 0;
-		while (allocatedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(0);
-			for (Container container : response.getAllocatedContainers()) {
-				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
-				++allocatedContainers;
-
-				// Launch container by create ContainerLaunchContext
-				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-				
-				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
-				if(hasLog4j) {
-					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
-				}
-				tmCommand	+= " org.apache.flink.yarn.YarnTaskManagerRunner -configDir . "
-						+ " 1>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stdout.log" 
-						+ " 2>"
-						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
-						+ "/taskmanager-stderr.log";
-				ctx.setCommands(Collections.singletonList(tmCommand));
-				
-				LOG.info("Starting TM with command="+tmCommand);
-				
-				// copy resources to the TaskManagers.
-				Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-				localResources.put("flink.jar", flinkJar);
-				localResources.put("flink-conf.yaml", flinkConf);
-				
-				// add ship resources
-				if(!shipListString.isEmpty()) {
-					Preconditions.checkNotNull(remoteShipRsc);
-					for( int i = 0; i < remoteShipPaths.length; i++) {
-						localResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
-					}
-				}
-				
-				
-				ctx.setLocalResources(localResources);
-				
-				// Setup CLASSPATH for Container (=TaskTracker)
-				Map<String, String> containerEnv = new HashMap<String, String>();
-				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
-				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
-				
-				ctx.setEnvironment(containerEnv);
-
-				UserGroupInformation user = UserGroupInformation.getCurrentUser();
-				try {
-					Credentials credentials = user.getCredentials();
-					DataOutputBuffer dob = new DataOutputBuffer();
-					credentials.writeTokenStorageToStream(dob);
-					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
-							0, dob.getLength());
-					ctx.setTokens(securityTokens);
-				} catch (IOException e) {
-					LOG.warn("Getting current user info failed when trying to launch the container"
-							+ e.getMessage());
-				}
-				
-				LOG.info("Launching container " + allocatedContainers);
-				nmClient.startContainer(container, ctx);
-			}
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(100);
-		}
-
-		// Now wait for containers to complete
-		
-		while (completedContainers < taskManagerCount) {
-			AllocateResponse response = rmClient.allocate(completedContainers
-					/ taskManagerCount);
-			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
-				++completedContainers;
-				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
-				LOG.info("Diagnostics "+status.getDiagnostics());
-			}
-			Thread.sleep(5000);
-		}
-		LOG.info("Shutting down JobManager");
-		jm.shutdown();
-		
-		// Un-register with ResourceManager
-		rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "", "");
-		
-		
-	}
-	public static void main(String[] args) throws Exception {
-		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					new ApplicationMaster().run();
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 6d4c7b5..a2090f1 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -20,13 +20,16 @@
 
 package org.apache.flink.yarn;
 
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,6 +50,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,22 +79,23 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
+
 /**
  * All classes in this package contain code taken from
  * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
  * and
  * https://github.com/hortonworks/simple-yarn-app
- * and 
+ * and
  * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- * 
- * The Flink jar is uploaded to HDFS by this client. 
+ *
+ * The Flink jar is uploaded to HDFS by this client.
  * The application master and all the TaskManager containers get the jar file downloaded
  * by YARN into their local fs.
- * 
+ *
  */
 public class Client {
 	private static final Log LOG = LogFactory.getLog(Client.class);
-	
+
 	/**
 	 * Command Line argument options
 	 */
@@ -107,11 +112,11 @@ public class Client {
 	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
 	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
 			+ " TaskTrackers)");
-	
+
 	/**
 	 * Constants
 	 */
-	// environment variable names 
+	// environment variable names
 	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
 	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
 	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
@@ -120,15 +125,28 @@ public class Client {
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
 	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	
+	public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
+
 	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
 
-	
-	
+	/**
+	 * Seconds to wait between each status query to the AM.
+	 */
+	private static final int CLIENT_POLLING_INTERVALL = 3;
+
 	private Configuration conf;
+	private YarnClient yarnClient;
+
+	private ClientMasterControl cmc;
+
+	private ApplicationId appId;
+
+	private File addrFile;
+
+	private Path sessionFilesDir;
 
 	public void run(String[] args) throws Exception {
-		
+
 		if(UserGroupInformation.isSecurityEnabled()) {
 			throw new RuntimeException("Flink YARN client does not have security support right now."
 					+ "File a bug, we will fix it asap");
@@ -149,7 +167,7 @@ public class Client {
 		options.addOption(QUEUE);
 		options.addOption(QUERY);
 		options.addOption(SHIP_PATH);
-		
+
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd = null;
 		try {
@@ -159,7 +177,7 @@ public class Client {
 			printUsage();
 			System.exit(1);
 		}
-		
+
 		if (System.getProperty("log4j.configuration") == null) {
 			Logger root = Logger.getRootLogger();
 			root.removeAllAppenders();
@@ -173,8 +191,8 @@ public class Client {
 				root.setLevel(Level.INFO);
 			}
 		}
-		
-		
+
+
 		// Jar Path
 		Path localJarPath;
 		if(cmd.hasOption(FLINK_JAR.getOpt())) {
@@ -186,15 +204,15 @@ public class Client {
 		} else {
 			localJarPath = new Path("file://"+Client.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 		}
-		
+
 		if(cmd.hasOption(GEN_CONF.getOpt())) {
 			LOG.info("Placing default configuration in current directory");
 			File outFile = generateDefaultConf(localJarPath);
 			LOG.info("File written to "+outFile.getAbsolutePath());
 			System.exit(0);
 		}
-		
-		// Conf Path 
+
+		// Conf Path
 		Path confPath = null;
 		String confDirPath = "";
 		if(cmd.hasOption(FLINK_CONF_DIR.getOpt())) {
@@ -207,7 +225,7 @@ public class Client {
 			confPath = new Path(confFile.getAbsolutePath());
 		} else {
 			System.out.println("No configuration file has been specified");
-			
+
 			// no configuration path given.
 			// -> see if there is one in the current directory
 			File currDir = new File(".");
@@ -229,7 +247,7 @@ public class Client {
 					System.exit(1);
 				} else if(candidates.length == 1) {
 					confPath = new Path(candidates[0].toURI());
-				} 
+				}
 			}
 		}
 		List<File> shipFiles = new ArrayList<File>();
@@ -257,25 +275,25 @@ public class Client {
 				hasLog4j = true;
 			}
 		}
-		
+
 		// queue
 		String queue = "default";
 		if(cmd.hasOption(QUEUE.getOpt())) {
 			queue = cmd.getOptionValue(QUEUE.getOpt());
 		}
-		
+
 		// JobManager Memory
 		int jmMemory = 512;
 		if(cmd.hasOption(JM_MEMORY.getOpt())) {
 			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
 		}
-		
+
 		// Task Managers memory
 		int tmMemory = 1024;
 		if(cmd.hasOption(TM_MEMORY.getOpt())) {
 			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
 		}
-		
+
 		// Task Managers vcores
 		int tmCores = 1;
 		if(cmd.hasOption(TM_CORES.getOpt())) {
@@ -288,24 +306,24 @@ public class Client {
 			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
 		}
 		conf = Utils.initializeYarnConfiguration();
-		
+
 		// intialize HDFS
 		LOG.info("Copy App Master jar from local filesystem and add to local environment");
-		// Copy the application master jar to the filesystem 
-		// Create a local resource to point to the destination jar path 
+		// Copy the application master jar to the filesystem
+		// Create a local resource to point to the destination jar path
 		final FileSystem fs = FileSystem.get(conf);
-		
+
 		if(fs.getScheme().startsWith("file")) {
 			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
 					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
 					+ "The Flink YARN client needs to store its files in a distributed file system");
 		}
-		
+
 		// Create yarnClient
 		final YarnClient yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();
-		
+
 		// Query cluster for metrics
 		if(cmd.hasOption(QUERY.getOpt())) {
 			showClusterMetrics(yarnClient);
@@ -316,10 +334,10 @@ public class Client {
 			yarnClient.stop();
 			System.exit(1);
 		}
-		
+
 		// TM Count
 		final int taskManagerCount = Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt()));
-		
+
 		System.out.println("Using values:");
 		System.out.println("\tContainer Count = "+taskManagerCount);
 		System.out.println("\tJar Path = "+localJarPath.toUri().getPath());
@@ -364,31 +382,31 @@ public class Client {
 			yarnClient.stop();
 			System.exit(1);
 		}
-		
+
 		// respect custom JVM options in the YAML file
 		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-		
+
 		// Set up the container launch context for the application master
 		ContainerLaunchContext amContainer = Records
 				.newRecord(ContainerLaunchContext.class);
-		
+
 		String amCommand = "$JAVA_HOME/bin/java"
 					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
 		if(hasLog4j) {
 			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 		}
-		amCommand 	+= " org.apache.flink.yarn.ApplicationMaster" + " "
+		amCommand 	+= " org.apache.flink.yarn.appMaster.ApplicationMaster" + " "
 					+ " 1>"
 					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
 					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
 		amContainer.setCommands(Collections.singletonList(amCommand));
-		
+
 		System.err.println("amCommand="+amCommand);
-		
+
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
 		final ApplicationId appId = appContext.getApplicationId();
-		
+
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
@@ -397,8 +415,8 @@ public class Client {
 		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
 		localResources.put("flink.jar", appMasterJar);
 		localResources.put("flink-conf.yaml", flinkConf);
-		
-		
+
+
 		// setup security tokens (code from apache storm)
 		final Path[] paths = new Path[3 + shipFiles.size()];
 		StringBuffer envShipFileList = new StringBuffer();
@@ -410,7 +428,7 @@ public class Client {
 			paths[3 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
 					shipLocalPath, shipResources, fs.getHomeDirectory());
 			localResources.put(shipFile.getName(), shipResources);
-			
+
 			envShipFileList.append(paths[3 + i]);
 			if(i+1 < shipFiles.size()) {
 				envShipFileList.append(',');
@@ -419,15 +437,16 @@ public class Client {
 
 		paths[0] = remotePathJar;
 		paths[1] = remotePathConf;
-		paths[2] = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-		fs.setPermission(paths[2], permission); // set permission for path.
+		fs.setPermission(sessionFilesDir, permission); // set permission for path.
 		Utils.setTokensFor(amContainer, paths, this.conf);
-		
-		 
+
+
 		amContainer.setLocalResources(localResources);
 		fs.close();
 
+		int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
 		// Setup CLASSPATH for ApplicationMaster
 		Map<String, String> appMasterEnv = new HashMap<String, String>();
 		Utils.setupEnv(conf, appMasterEnv);
@@ -440,52 +459,35 @@ public class Client {
 		appMasterEnv.put(Client.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
 		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
 		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		
+		appMasterEnv.put(Client.ENV_AM_PRC_PORT, String.valueOf(amRPCPort));
+
 		amContainer.setEnvironment(appMasterEnv);
-		
+
 		// Set up resource type requirements for ApplicationMaster
 		Resource capability = Records.newRecord(Resource.class);
 		capability.setMemory(jmMemory);
 		capability.setVirtualCores(1);
-		
+
 		appContext.setApplicationName("Flink"); // application name
 		appContext.setAMContainerSpec(amContainer);
 		appContext.setResource(capability);
 		appContext.setQueue(queue);
-		
+
 		// file that we write into the conf/ dir containing the jobManager address.
-		final File addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
-		
-		Runtime.getRuntime().addShutdownHook(new Thread() {
-		@Override
-		public void run() {
-			try {
-				LOG.info("Killing the Flink-YARN application.");
-				yarnClient.killApplication(appId);
-				LOG.info("Deleting files in "+paths[2]);
-				FileSystem shutFS = FileSystem.get(conf);
-				shutFS.delete(paths[2], true); // delete conf and jar file.
-				shutFS.close();
-			} catch (Exception e) {
-				LOG.warn("Exception while killing the YARN application", e);
-			}
-			try {
-				addrFile.delete();
-			} catch (Exception e) {
-				LOG.warn("Exception while deleting the jobmanager address file", e);
-			}
-			LOG.info("YARN Client is shutting down");
-			yarnClient.stop();
-		}
-		});
-		
+		addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+
+
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
 		ApplicationReport appReport = yarnClient.getApplicationReport(appId);
 		YarnApplicationState appState = appReport.getYarnApplicationState();
 		boolean told = false;
 		char[] el = { '/', '|', '\\', '-'};
-		int i = 0; 
+		int i = 0;
+		int numTaskmanagers = 0;
+
+		BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+
 		while (appState != YarnApplicationState.FINISHED
 				&& appState != YarnApplicationState.KILLED
 				&& appState != YarnApplicationState.FAILED) {
@@ -493,11 +495,15 @@ public class Client {
 				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
 				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
 				// write jobmanager connect information
-				
 				PrintWriter out = new PrintWriter(addrFile);
 				out.println(appReport.getHost()+":"+jmPort);
 				out.close();
 				addrFile.setReadable(true, false); // readable for all.
+
+				// connect RPC service
+				cmc = new ClientMasterControl(new InetSocketAddress(appReport.getHost(), amRPCPort));
+				cmc.start();
+				Runtime.getRuntime().addShutdownHook(new ClientShutdownHook());
 				told = true;
 			}
 			if(!told) {
@@ -507,24 +513,113 @@ public class Client {
 				}
 				Thread.sleep(500); // wait for the application to switch to RUNNING
 			} else {
-				Thread.sleep(5000);
+				int newTmCount = cmc.getNumberOfTaskManagers();
+				if(numTaskmanagers != newTmCount) {
+					System.err.println("Number of connected TaskManagers changed to "+newTmCount+" slots available: "+cmc.getNumberOfAvailableSlots());
+					numTaskmanagers = newTmCount;
+				}
+				if(cmc.getFailedStatus()) {
+					System.err.println("The Application Master failed!\nMessages:\n");
+					for(Message m: cmc.getMessages() ) {
+						System.err.println("Message: "+m.text);
+					}
+					System.err.println("Requesting Application Master shutdown");
+					cmc.shutdownAM();
+					System.err.println("Application Master closed.");
+				}
+				for(Message m: cmc.getMessages() ) {
+					System.err.println("Message: "+m.text);
+				}
+
+				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
+				long startTime = System.currentTimeMillis();
+				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
+				        && !in.ready()) {
+					Thread.sleep(200);
+				}
+				if (in.ready()) {
+					String command = in.readLine();
+					evalCommand(command);
+				}
+
 			}
-			
+
 			appReport = yarnClient.getApplicationReport(appId);
 			appState = appReport.getYarnApplicationState();
 		}
 
 		LOG.info("Application " + appId + " finished with"
-				+ " state " + appState + " at " + appReport.getFinishTime());
+				+ " state " + appState + "and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
 		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
 			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
 		}
-		
+
+	}
+
+	private void printHelp() {
+		System.err.println("Available commands:\n"
+				+ "\t stop : Stop the YARN session\n"
+			//	+ "\t add n : Add n TaskManagers to the YARN session\n"
+			//	+ "\t remove n : Remove n TaskManagers to the YARN session\n"
+				+ "\t allmsg : Show all messages\n");
 	}
+	private void evalCommand(String command) {
+		if(command.equals("help")) {
+			printHelp();
+		} else if(command.equals("stop") || command.equals("quit") || command.equals("exit")) {
+			stopSession();
+			System.exit(0);
+		} else if(command.equals("allmsg")) {
+			System.err.println("All messages from the ApplicationMaster:");
+			for(Message m: cmc.getMessages() ) {
+				System.err.println("Message: "+m.text);
+			}
+		} else if(command.startsWith("add")) {
+			String nStr = command.replace("add", "").trim();
+			int n = Integer.valueOf(nStr);
+			System.err.println("Adding "+n+" TaskManagers to the session");
+			cmc.addTaskManagers(n);
+		} else {
+			System.err.println("Unknown command '"+command+"'");
+			printHelp();
+		}
+	}
+
+	private void stopSession() {
+		try {
+			LOG.info("Sending shutdown request to the Application Master");
+			cmc.shutdownAM();
+			yarnClient.killApplication(appId);
+			LOG.info("Deleting files in "+sessionFilesDir );
+			FileSystem shutFS = FileSystem.get(conf);
+			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+			shutFS.close();
+			cmc.close();
+		} catch (Exception e) {
+			LOG.warn("Exception while killing the YARN application", e);
+		}
+		try {
+			addrFile.delete();
+		} catch (Exception e) {
+			LOG.warn("Exception while deleting the JobManager address file", e);
+		}
+		LOG.info("YARN Client is shutting down");
+		yarnClient.stop();
+	}
+
+	public class ClientShutdownHook extends Thread {
+		@Override
+		public void run() {
+			stopSession();
+		}
+	}
+
 	private static class ClusterResourceDescription {
 		public int totalFreeMemory;
 		public int containerLimit;
 	}
+
 	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
 		ClusterResourceDescription crd = new ClusterResourceDescription();
 		crd.totalFreeMemory = 0;
@@ -549,13 +644,10 @@ public class Client {
 		Options req = new Options();
 		req.addOption(CONTAINER);
 		formatter.printHelp(" ", req);
-		
+
 		formatter.setSyntaxPrefix("   Optional");
 		Options opt = new Options();
 		opt.addOption(VERBOSE);
-	//	opt.addOption(GEN_CONF);
-	//	opt.addOption(STRATOSPHERE_CONF);
-	//	opt.addOption(STRATOSPHERE_JAR);
 		opt.addOption(JM_MEMORY);
 		opt.addOption(TM_MEMORY);
 		opt.addOption(TM_CORES);
@@ -604,10 +696,10 @@ public class Client {
 			System.exit(1);
 		}
 		InputStream confStream = jar.getInputStream(jar.getEntry("flink-conf.yaml"));
-		
+
 		if(confStream == null) {
 			LOG.warn("Given jar file does not contain yaml conf.");
-			confStream = this.getClass().getResourceAsStream("flink-conf.yaml"); 
+			confStream = this.getClass().getResourceAsStream("flink-conf.yaml");
 			if(confStream == null) {
 				throw new RuntimeException("Unable to find flink-conf in jar file");
 			}
@@ -630,4 +722,6 @@ public class Client {
 		Client c = new Client();
 		c.run(args);
 	}
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
new file mode 100644
index 0000000..44633ea
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn;
+
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
+
+
+public class ClientMasterControl extends Thread {
+	private static final Log LOG = LogFactory.getLog(ClientMasterControl.class);
+
+	private InetSocketAddress applicationMasterAddress;
+
+	private ApplicationMasterStatus appMasterStatus;
+	private YARNClientMasterProtocol cmp;
+	private Object lock = new Object();
+	private List<Message> messages;
+	private boolean running = true;
+
+	public ClientMasterControl(InetSocketAddress applicationMasterAddress) {
+		super();
+		this.applicationMasterAddress = applicationMasterAddress;
+	}
+
+	@Override
+	public void run() {
+		try {
+			cmp = RPC.getProxy(YARNClientMasterProtocol.class, applicationMasterAddress, NetUtils.getSocketFactory());
+
+			while(running) {
+				synchronized (lock) {
+					appMasterStatus = cmp.getAppplicationMasterStatus();
+					if(messages != null && appMasterStatus != null &&
+							messages.size() != appMasterStatus.getMessageCount()) {
+						messages = cmp.getMessages();
+					}
+				}
+
+				try {
+					Thread.sleep(5000);
+				} catch (InterruptedException e) {
+					LOG.warn("Error while getting application status", e);
+				}
+			}
+			RPC.stopProxy(cmp);
+		} catch (IOException e) {
+			LOG.warn("Error while running RPC service", e);
+		}
+
+	}
+
+	public int getNumberOfTaskManagers() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return 0;
+			}
+			return appMasterStatus.getNumberOfTaskManagers();
+		}
+	}
+
+	public int getNumberOfAvailableSlots() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return 0;
+			}
+			return appMasterStatus.getNumberOfAvailableSlots();
+		}
+	}
+	
+	public boolean getFailedStatus() {
+		synchronized (lock) {
+			if(appMasterStatus == null) {
+				return false;
+			}
+			return appMasterStatus.getFailed();
+		}
+	}
+
+	public boolean shutdownAM() {
+		try {
+			return cmp.shutdownAM().getValue();
+		} catch(Throwable e) {
+			LOG.warn("Error shutting down the application master", e);
+			return false;
+		}
+	}
+
+	public List<Message> getMessages() {
+		if(this.messages == null) {
+			return new ArrayList<Message>();
+		}
+		return this.messages;
+	}
+
+	public void close() {
+		running = false;
+	}
+
+	public void addTaskManagers(int n) {
+		cmp.addTaskManagers(n);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index b72b9bc..bd2cb9e 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -106,7 +106,7 @@ public class Utils {
 	 * 
 	 */
 	public static int calculateHeapSize(int memory) {
-		int heapLimit = (int)((float)memory*0.85);
+		int heapLimit = (int)((float)memory*0.80);
 		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
 			heapLimit = memory-HEAP_LIMIT_CAP;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
deleted file mode 100644
index b541317..0000000
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-
-package org.apache.flink.yarn;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-
-public class YarnTaskManagerRunner {
-	
-	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
-	
-	public static void main(final String[] args) throws IOException {
-		Map<String, String> envs = System.getenv();
-		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
-		
-		// configure local directory
-		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
-		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
-		newArgs[newArgs.length-1] = localDirs;
-		LOG.info("Setting log path "+localDirs);
-		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
-				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
-		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
-		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
-			ugi.addToken(toks);
-		}
-		ugi.doAs(new PrivilegedAction<Object>() {
-			@Override
-			public Object run() {
-				try {
-					TaskManager.main(newArgs);
-				} catch (Exception e) {
-					e.printStackTrace();
-				}
-				return null;
-			}
-		});
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
new file mode 100644
index 0000000..2186fca
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.ipc.RPC;
+import org.apache.flink.runtime.ipc.RPC.Server;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.util.SerializableArrayList;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.yarn.Client;
+import org.apache.flink.yarn.Utils;
+import org.apache.flink.yarn.rpc.ApplicationMasterStatus;
+import org.apache.flink.yarn.rpc.YARNClientMasterProtocol;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.base.Preconditions;
+
+
+public class ApplicationMaster implements YARNClientMasterProtocol {
+
+	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+	private final String currDir;
+	private final String logDirs;
+	private final String ownHostname;
+	private final String appId;
+	private final String clientHomeDir;
+	private final String applicationMasterHost;
+	private final String remoteFlinkJarPath;
+	private final String shipListString;
+	private final String yarnClientUsername;
+	private final String rpcPort;
+	private final int taskManagerCount;
+	private final int memoryPerTaskManager;
+	private final int coresPerTaskManager;
+	private final String localWebInterfaceDir;
+	private final Configuration conf;
+
+	/**
+	 * File system for interacting with Flink's files such as the jar
+	 * and the configuration.
+	 */
+	private FileSystem fs;
+	
+	/**
+	 * The JobManager that is running in the same JVM as this Application Master.
+	 */
+	private JobManager jobManager;
+
+	/**
+	 * RPC server for letting the YARN client connect to this AM.
+	 * This RPC connection is handling application specific requests.
+	 */
+	private final Server amRpcServer;
+
+	/**
+	 * RPC connecton of the AppMaster to the Resource Manager (YARN master)
+	 */
+	private AMRMClient<ContainerRequest> rmClient;
+
+	/**
+	 * RPC connection to the Node Manager.
+	 */
+	private NMClient nmClient;
+
+	/**
+	 * Messages of the AM that the YARN client is showing the user in the YARN session
+	 */
+	private List<Message> messages = new SerializableArrayList<Message>();
+
+	/**
+	 * Indicates if a log4j config file is being shipped.
+	 */
+	private boolean hasLog4j;
+	
+	/**
+	 * Heap size of TaskManager containers in MB.
+	 */
+	private int heapLimit;
+
+	/**
+	 * Number of containers that stopped running
+	 */
+	private int completedContainers = 0;
+
+	/**
+	 * Local resources used by all Task Manager containers.
+	 */
+	Map<String, LocalResource> taskManagerLocalResources;
+
+	/**
+	 * Flag indicating if the YARN session has failed.
+	 * A session failed if all containers stopped or an error occurred.
+	 * The ApplicationMaster will not close the RPC connection if it has failed (so
+	 * that the client can still retrieve the messages and then shut it down)
+	 */
+	private Boolean isFailed = false;
+
+	public ApplicationMaster(Configuration conf) throws IOException {
+		fs = FileSystem.get(conf);
+		Map<String, String> envs = System.getenv();
+		currDir = envs.get(Environment.PWD.key());
+		logDirs =  envs.get(Environment.LOG_DIRS.key());
+		ownHostname = envs.get(Environment.NM_HOST.key());
+		appId = envs.get(Client.ENV_APP_ID);
+		clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
+		applicationMasterHost = envs.get(Environment.NM_HOST.key());
+		remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
+		shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
+		yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		rpcPort = envs.get(Client.ENV_AM_PRC_PORT);
+		taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
+		memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
+		coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+		localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
+		this.conf = conf;
+
+		if(currDir == null) {
+			throw new RuntimeException("Current directory unknown");
+		}
+		if(ownHostname == null) {
+			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
+		}
+		LOG.info("Working directory "+currDir);
+
+		// load Flink configuration.
+		Utils.getFlinkConfiguration(currDir);
+
+		// start AM RPC service
+		amRpcServer = RPC.getServer(this, ownHostname, Integer.valueOf(rpcPort), 2);
+		amRpcServer.start();
+	}
+	
+	private void setFailed(boolean failed) {
+		this.isFailed = failed;
+	}
+
+	private void generateConfigurationFile() throws IOException {
+		// Update yaml conf -> set jobManager address to this machine's address.
+		FileInputStream fis = new FileInputStream(currDir+"/flink-conf.yaml");
+		BufferedReader br = new BufferedReader(new InputStreamReader(fis));
+		Writer output = new BufferedWriter(new FileWriter(currDir+"/flink-conf-modified.yaml"));
+		String line ;
+		while ( (line = br.readLine()) != null) {
+			if(line.contains(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+			} else if(line.contains(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY)) {
+				output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+"\n");
+			} else {
+				output.append(line+"\n");
+			}
+		}
+		// just to make sure.
+		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+		output.close();
+		br.close();
+		File newConf = new File(currDir+"/flink-conf-modified.yaml");
+		if(!newConf.exists()) {
+			LOG.warn("modified yaml does not exist!");
+		}
+	}
+
+	private void startJobManager() throws Exception {
+		Utils.copyJarContents("resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME,
+				ApplicationMaster.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+		String pathToNepheleConfig = currDir+"/flink-conf-modified.yaml";
+		String[] args = {"-executionMode","cluster", "-configDir", pathToNepheleConfig};
+
+		// start the job manager
+		jobManager = JobManager.initialize( args );
+
+		// Start info server for jobmanager
+		jobManager.startInfoServer();
+	}
+
+	private void setRMClient(AMRMClient<ContainerRequest> rmClient) {
+		this.rmClient = rmClient;
+	}
+
+	private void run() throws Exception  {
+		heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
+
+		nmClient = NMClient.createNMClient();
+		nmClient.init(conf);
+		nmClient.start();
+		nmClient.cleanupRunningContainersOnStop(true);
+
+		// Register with ResourceManager
+		LOG.info("Registering ApplicationMaster");
+		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+
+		// Priority for worker containers - priorities are intra-application
+		Priority priority = Records.newRecord(Priority.class);
+		priority.setPriority(0);
+
+		// Resource requirements for worker containers
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(memoryPerTaskManager);
+		capability.setVirtualCores(coresPerTaskManager);
+
+		// Make container requests to ResourceManager
+		for (int i = 0; i < taskManagerCount; ++i) {
+			ContainerRequest containerAsk = new ContainerRequest(capability,
+					null, null, priority);
+			LOG.info("Requesting TaskManager container " + i);
+			rmClient.addContainerRequest(containerAsk);
+		}
+
+		LocalResource flinkJar = Records.newRecord(LocalResource.class);
+		LocalResource flinkConf = Records.newRecord(LocalResource.class);
+
+		// register Flink Jar with remote HDFS
+		final Path remoteJarPath = new Path(remoteFlinkJarPath);
+		Utils.registerLocalResource(fs, remoteJarPath, flinkJar);
+
+		// register conf with local fs.
+		Utils.setupLocalResource(conf, fs, appId, new Path("file://"+currDir+"/flink-conf-modified.yaml"), flinkConf, new Path(clientHomeDir));
+		LOG.info("Prepared local resource for modified yaml: "+flinkConf);
+
+
+		hasLog4j = new File(currDir+"/log4j.properties").exists();
+		// prepare the files to ship
+		LocalResource[] remoteShipRsc = null;
+		String[] remoteShipPaths = shipListString.split(",");
+		if(!shipListString.isEmpty()) {
+			remoteShipRsc = new LocalResource[remoteShipPaths.length];
+			{ // scope for i
+				int i = 0;
+				for(String remoteShipPathStr : remoteShipPaths) {
+					if(remoteShipPathStr == null || remoteShipPathStr.isEmpty()) {
+						continue;
+					}
+					remoteShipRsc[i] = Records.newRecord(LocalResource.class);
+					Path remoteShipPath = new Path(remoteShipPathStr);
+					Utils.registerLocalResource(fs, remoteShipPath, remoteShipRsc[i]);
+					i++;
+				}
+			}
+		}
+		// copy resources to the TaskManagers.
+		taskManagerLocalResources = new HashMap<String, LocalResource>(2);
+		taskManagerLocalResources.put("flink.jar", flinkJar);
+		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
+
+		// add ship resources
+		if(!shipListString.isEmpty()) {
+			Preconditions.checkNotNull(remoteShipRsc);
+			for( int i = 0; i < remoteShipPaths.length; i++) {
+				taskManagerLocalResources.put(new Path(remoteShipPaths[i]).getName(), remoteShipRsc[i]);
+			}
+		}
+		completedContainers = 0;
+		
+		// Obtain allocated containers and launch
+		StringBuffer containerDiag = new StringBuffer(); // diagnostics log for the containers.
+		allocateOutstandingContainer(containerDiag);
+		LOG.info("Allocated all initial containers");
+
+		// Now wait for containers to complete
+		while (completedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(completedContainers
+					/ taskManagerCount);
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+				logDeadContainer(status, containerDiag);
+			}
+			Thread.sleep(5000);
+		}
+		LOG.info("Shutting down JobManager");
+		jobManager.shutdown();
+
+		// Un-register with ResourceManager
+		final String diagnosticsMessage = "Application Master shut down after all "
+				+ "containers finished\n"+containerDiag.toString();
+		LOG.info("Diagnostics message: "+diagnosticsMessage);
+		rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnosticsMessage, "");
+		this.close();
+		LOG.info("Application Master shutdown completed.");
+	}
+
+	/**
+	 * Run a Thread to allocate new containers until taskManagerCount
+	 * is correct again.
+	 */
+	private void allocateOutstandingContainer(StringBuffer containerDiag) throws Exception {
+
+		// respect custom JVM options in the YAML file
+		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
+
+		int allocatedContainers = 0;
+		while (allocatedContainers < taskManagerCount) {
+			AllocateResponse response = rmClient.allocate(0);
+			for (Container container : response.getAllocatedContainers()) {
+				LOG.info("Got new Container for TM "+container.getId()+" on host "+container.getNodeId().getHost());
+				++allocatedContainers;
+
+				// Launch container by create ContainerLaunchContext
+				ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+				String tmCommand = "$JAVA_HOME/bin/java -Xmx"+heapLimit+"m " + javaOpts ;
+				if(hasLog4j) {
+					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
+				}
+				tmCommand	+= " org.apache.flink.appMaster.YarnTaskManagerRunner -configDir . "
+						+ " 1>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stdout.log" 
+						+ " 2>"
+						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
+						+ "/taskmanager-stderr.log";
+				ctx.setCommands(Collections.singletonList(tmCommand));
+
+				LOG.info("Starting TM with command="+tmCommand);
+
+				ctx.setLocalResources(taskManagerLocalResources);
+
+				// Setup CLASSPATH for Container (=TaskTracker)
+				Map<String, String> containerEnv = new HashMap<String, String>();
+				Utils.setupEnv(conf, containerEnv); //add flink.jar to class path.
+				containerEnv.put(Client.ENV_CLIENT_USERNAME, yarnClientUsername);
+
+				ctx.setEnvironment(containerEnv);
+
+				UserGroupInformation user = UserGroupInformation.getCurrentUser();
+				try {
+					Credentials credentials = user.getCredentials();
+					DataOutputBuffer dob = new DataOutputBuffer();
+					credentials.writeTokenStorageToStream(dob);
+					ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(),
+							0, dob.getLength());
+					ctx.setTokens(securityTokens);
+				} catch (IOException e) {
+					LOG.warn("Getting current user info failed when trying to launch the container", e);
+				}
+
+				LOG.info("Launching container " + allocatedContainers);
+				nmClient.startContainer(container, ctx);
+				messages.add(new Message("Launching new container"));
+			}
+			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+				++completedContainers;
+				LOG.info("Completed container (while allocating) "+status.getContainerId()+". Total Completed:" + completedContainers);
+				LOG.info("Diagnostics "+status.getDiagnostics());
+				// status.
+				logDeadContainer(status, containerDiag);
+			}
+			Thread.sleep(100);
+		}
+	}
+
+	private void logDeadContainer(ContainerStatus status,
+			StringBuffer containerDiag) {
+		String msg = "Diagnostics for containerId="+status.getContainerId()+
+				" in state="+status.getState()+"\n"+status.getDiagnostics();
+		messages.add(new Message(msg) );
+		containerDiag.append("\n\n");
+		containerDiag.append(msg);
+	}
+	
+	@Override
+	public ApplicationMasterStatus getAppplicationMasterStatus() {
+		ApplicationMasterStatus amStatus;
+		if(jobManager == null) {
+			// JM not yet started
+			amStatus = new ApplicationMasterStatus(0, 0 );
+		} else {
+			amStatus = new ApplicationMasterStatus(jobManager.getNumberOfTaskManagers(), jobManager.getAvailableSlots() );
+		}
+		amStatus.setMessageCount(messages.size());
+		amStatus.setFailed(isFailed);
+		return amStatus;
+	}
+	
+
+	@Override
+	public BooleanValue shutdownAM() throws Exception {
+		LOG.info("Client requested shutdown of AM");
+		FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
+		String finalMessage = "";
+		if(isFailed) {
+			finalStatus = FinalApplicationStatus.FAILED;
+			finalMessage = "Application Master failed";
+			isFailed = false; // allow a proper shutdown
+			isFailed.notifyAll();
+		}
+		rmClient.unregisterApplicationMaster(finalStatus, finalMessage, "");
+		this.close();
+		return new BooleanValue(true);
+	}
+	
+	private void close() throws Exception {
+		nmClient.close();
+		rmClient.close();
+		if(!isFailed) {
+			LOG.warn("Can not close AM RPC connection since this the AM is in failed state");
+			amRpcServer.stop();
+		}
+	}
+
+	@Override
+	public List<Message> getMessages() {
+		return messages;
+	}
+
+	public void addMessage(Message msg) {
+		messages.add(msg);
+	}
+
+	@Override
+	public void addTaskManagers(int n) {
+		throw new RuntimeException("Implement me");
+	}
+
+	/**
+	 * Keeps the ApplicationMaster JVM with the Client RPC service running
+	 * to allow it retrieving the error message.
+	 */
+	protected void keepRPCAlive() {
+		synchronized (isFailed) {
+			while(true) {
+				if(isFailed) {
+					try {
+						isFailed.wait(100);
+					} catch (InterruptedException e) {
+						LOG.warn("Error while waiting until end of failed mode of AM", e);
+					}
+				} else {
+					// end of isFailed mode.
+					break;
+				}
+			}
+		}
+	}
+	
+	public static void main(String[] args) throws Exception {
+		// execute Application Master using the client's user
+		final String yarnClientUsername = System.getenv(Client.ENV_CLIENT_USERNAME);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink ApplicationMaster/JobManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				AMRMClient<ContainerRequest> rmClient = null;
+				ApplicationMaster am = null;
+				try {
+					Configuration conf = Utils.initializeYarnConfiguration();
+					rmClient = AMRMClient.createAMRMClient();
+					rmClient.init(conf);
+					rmClient.start();
+
+					// run the actual Application Master
+					am = new ApplicationMaster(conf);
+					am.generateConfigurationFile();
+					am.startJobManager();
+					am.setRMClient(rmClient);
+					am.run();
+				} catch (Throwable e) {
+					LOG.fatal("Error while running the application master", e);
+					// the AM is not available. Report error through the unregister function.
+					if(rmClient != null && am == null) {
+						try {
+							rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "Flink YARN Application master"
+									+ " stopped unexpectedly with an exception.\n"
+									+ StringUtils.stringifyException(e), "");
+						} catch (Exception e1) {
+							LOG.fatal("Unable to fail the application master", e1);
+						}
+						LOG.info("AM unregistered from RM");
+						return null;
+					}
+					if(rmClient == null) {
+						LOG.fatal("Unable to unregister AM since the RM client is not available");
+					}
+					if(am != null) {
+						LOG.info("Writing error into internal message system");
+						am.setFailed(true);
+						am.addMessage(new Message("The application master failed with an exception:\n"
+								+ StringUtils.stringifyException(e)));
+						am.keepRPCAlive();
+					}
+				}
+				return null;
+			}
+		});
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..0b0b1e4
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.appMaster;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.yarn.Client;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
+
+public class YarnTaskManagerRunner {
+
+	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
+
+	public static void main(final String[] args) throws IOException {
+		Map<String, String> envs = System.getenv();
+		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+
+		// configure local directory
+		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
+		newArgs[newArgs.length-1] = localDirs;
+		LOG.info("Setting log path "+localDirs);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					TaskManager.main(newArgs);
+				} catch (Exception e) {
+					LOG.fatal("Error while running the TaskManager", e);
+				}
+				return null;
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
new file mode 100644
index 0000000..b2bdf6b
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Class holding status information about the ApplicatioMaster.
+ * The client is requesting the AM status regularly from the AM.
+ */
+public class ApplicationMasterStatus implements IOReadableWritable {
+	private int numTaskManagers = 0;
+	private int numSlots = 0;
+	private int messageCount = 0;
+	private boolean failed = false;
+
+
+	public ApplicationMasterStatus() {
+		// for instantiation
+	}
+	
+	public ApplicationMasterStatus(int numTaskManagers, int numSlots) {
+		this.numTaskManagers = numTaskManagers;
+		this.numSlots = numSlots;
+	}
+
+	public ApplicationMasterStatus(int numTaskManagers, int numSlots,
+			int messageCount, boolean failed) {
+		this(numTaskManagers, numSlots);
+		this.messageCount = messageCount;
+		this.failed = failed;
+	}
+	
+
+	public int getNumberOfTaskManagers() {
+		return numTaskManagers;
+	}
+
+	public int getNumberOfAvailableSlots() {
+		return numSlots;
+	}
+
+	public int getMessageCount() {
+		return messageCount;
+	}
+	
+	public void setMessageCount(int messageCount) {
+		this.messageCount = messageCount;
+	}
+	
+	public void setFailed(Boolean isFailed) {
+		this.failed = isFailed;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(numTaskManagers);
+		out.writeInt(numSlots);
+		out.writeInt(messageCount);
+		out.writeBoolean(failed);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		numTaskManagers = in.readInt();
+		numSlots = in.readInt();
+		messageCount = in.readInt();
+		failed = in.readBoolean();
+	}
+
+	public boolean getFailed() {
+		return failed;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
new file mode 100644
index 0000000..560147c
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.rpc;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.types.BooleanValue;
+
+
+/**
+ * Interface describing the methods offered by the RPC service between
+ * the Client and Application Master
+ */
+public interface YARNClientMasterProtocol extends VersionedProtocol {
+
+	public static class Message implements IOReadableWritable {
+		public String text;
+
+		public Message(String msg) {
+			this.text = msg;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeUTF(text);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			text = in.readUTF();
+		}
+	}
+
+	ApplicationMasterStatus getAppplicationMasterStatus();
+
+	BooleanValue shutdownAM() throws Exception;
+
+	List<Message> getMessages();
+
+	void addTaskManagers(int n);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index d614cc2..454c543 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -245,7 +245,7 @@ public class NepheleMiniCluster {
 	// ------------------------------------------------------------------------
 	
 	private void waitForJobManagerToBecomeReady(int numTaskManagers) throws InterruptedException {
-		while (jobManager.getNumberOfTaskTrackers() < numTaskManagers) {
+		while (jobManager.getNumberOfTaskManagers() < numTaskManagers) {
 			Thread.sleep(50);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 108db51..2983c63 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -267,6 +267,10 @@ public final class ConfigConstants {
 	 */
 	public static final String WEB_ACCESS_FILE_KEY = "webclient.access";
 	
+	// ----------------------------- YARN Client ----------------------------
+	
+	public static final String YARN_AM_PRC_PORT = "yarn.am.rpc.port";
+	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
@@ -475,6 +479,11 @@ public final class ConfigConstants {
 	 * The default path to the file containing the list of access privileged users and passwords.
 	 */
 	public static final String DEFAULT_WEB_ACCESS_FILE_PATH = null;
+	
+	// ----------------------------- YARN ----------------------------
+	
+	public static final int DEFAULT_YARN_AM_RPC_PORT = 10245;
+	
 
 	// ----------------------------- LocalExecution ----------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index 24d2fbc..5c65131 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -382,7 +382,7 @@ public class DefaultInstanceManager implements InstanceManager {
 	}
 
 	@Override
-	public int getNumberOfTaskTrackers() {
+	public int getNumberOfTaskManagers() {
 		return this.registeredHosts.size();
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index eed8b51..e8f8cbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -46,9 +46,9 @@ public interface InstanceManager {
 
 	Instance getInstanceByName(String name);
 
-	int getNumberOfTaskTrackers();
+	int getNumberOfTaskManagers();
 
 	int getNumberOfSlots();
-	
+
 	Map<InstanceConnectionInfo, Instance> getInstances();
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 8ef6b58..82f663c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -1180,8 +1180,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		return this.archive;
 	}
 
-	public int getNumberOfTaskTrackers() {
-		return this.instanceManager.getNumberOfTaskTrackers();
+	public int getNumberOfTaskManagers() {
+		return this.instanceManager.getNumberOfTaskManagers();
 	}
 	
 	public Map<InstanceConnectionInfo, Instance> getInstances() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 386315f..3ffe5af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskTrackers() +"}");
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae32c187/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
index d28ea2c..5b09704 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.jobmanager.web;
 
 import java.io.File;
@@ -31,73 +30,71 @@ import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.flink.util.StringUtils;
 
+import com.google.common.base.Preconditions;
+
 public class LogfileInfoServlet extends HttpServlet {
-	
+
 	private static final long serialVersionUID = 1L;
 
 	/**
 	 * The log for this class.
 	 */
 	private static final Log LOG = LogFactory.getLog(LogfileInfoServlet.class);
-	
-	private File logDir;
-	
-	public LogfileInfoServlet(File logDir) {
-		this.logDir = logDir;
+
+	private File[] logDirs;
+
+
+	public LogfileInfoServlet(File[] logDirs) {
+		Preconditions.checkNotNull(logDirs, "The given log files are null.");
+		this.logDirs = logDirs;
 	}
-	
+
 	@Override
 	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
 
 		try {
 			if("stdout".equals(req.getParameter("get"))) {
-				// Find current stdtout file
-				for(File f : logDir.listFiles()) {
-					// contains "jobmanager" ".log" and no number in the end ->needs improvement
-					if( f.getName().equals("jobmanager-stdout.log") ||
-							(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".out") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))
-							) {
-						
-						resp.setStatus(HttpServletResponse.SC_OK);
-						resp.setContentType("text/plain ");
-						writeFile(resp.getOutputStream(), f);
-						break;
-					}
-				}
+				// Find current stdout file
+				sendFile("jobmanager-stdout.log", resp);
 			}
 			else {
 				// Find current logfile
-				for(File f : logDir.listFiles()) {
-					// contains "jobmanager" ".log" and no number in the end ->needs improvement
-					if( f.getName().equals("jobmanager-stderr.log") ||
-							(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) ))) {
-						
-						resp.setStatus(HttpServletResponse.SC_OK);
-						resp.setContentType("text/plain ");
-						writeFile(resp.getOutputStream(), f);
-						break;
-					}
-					
-				}
+				sendFile("jobmanager-log4j.log", resp);
 			}
 		} catch (Throwable t) {
 			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(t.getMessage());
+			resp.getWriter().print("Error opening log files':"+t.getMessage());
 			if (LOG.isWarnEnabled()) {
 				LOG.warn(StringUtils.stringifyException(t));
 			}
 		}
 	}
-	
+
+	private void sendFile(String fileName, HttpServletResponse resp) throws IOException {
+		for(File logDir: logDirs) {
+			for(File f : logDir.listFiles()) {
+				// contains "jobmanager" ".log" and no number in the end ->needs improvement
+				if( f.getName().equals(fileName) /*||
+						(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) */
+						) {
+
+					resp.setStatus(HttpServletResponse.SC_OK);
+					resp.setContentType("text/plain");
+					writeFile(resp.getOutputStream(), f);
+				}
+			}
+		}
+	}
 	private static void writeFile(OutputStream out, File file) throws IOException {
 		byte[] buf = new byte[4 * 1024]; // 4K buffer
-		
+
 		FileInputStream  is = null;
 		try {
 			is = new FileInputStream(file);
-			
+			out.write(("==== FILE: "+file.toString()+" ====\n").getBytes());
 			int bytesRead;
 			while ((bytesRead = is.read(buf)) != -1) {
 				out.write(buf, 0, bytesRead);
@@ -108,5 +105,5 @@ public class LogfileInfoServlet extends HttpServlet {
 			}
 		}
 	}
-	
+
 }


[3/3] git commit: [FLINK-968] Add slot parameter to YARN client

Posted by rm...@apache.org.
[FLINK-968] Add slot parameter to YARN client


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e8f2e9d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e8f2e9d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e8f2e9d0

Branch: refs/heads/master
Commit: e8f2e9d0efa2fb27106658de38032d8adbb76dba
Parents: ae32c18
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Jul 17 10:55:30 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Aug 20 10:38:25 2014 +0200

----------------------------------------------------------------------
 docs/config.md                                  |   5 +
 docs/yarn_setup.md                              |  78 +++++++--
 .../main/java/org/apache/flink/yarn/Client.java | 167 ++++++++++++++-----
 .../apache/flink/yarn/ClientMasterControl.java  |  18 +-
 .../flink/yarn/appMaster/ApplicationMaster.java | 103 +++++++++---
 .../flink/yarn/rpc/ApplicationMasterStatus.java |   9 +
 .../yarn/rpc/YARNClientMasterProtocol.java      |  22 ++-
 .../org/apache/flink/client/CliFrontend.java    | 101 +++++++++--
 .../testconfigwithinvalidyarn/.yarn-jobmanager  |   1 -
 .../testconfigwithinvalidyarn/.yarn-properties  |   1 +
 .../testconfigwithyarn/.yarn-jobmanager         |   1 -
 .../testconfigwithyarn/.yarn-properties         |   3 +
 .../resources/web-docs-infoserver/analyze.html  |   4 +-
 .../web-docs-infoserver/blank-page.html         |   4 +-
 .../web-docs-infoserver/configuration.html      |   4 +-
 .../resources/web-docs-infoserver/history.html  |   6 +-
 .../resources/web-docs-infoserver/index.html    |  32 +++-
 .../js/jobmanagerFrontend.js                    |   1 +
 .../web-docs-infoserver/taskmanagers.html       |   6 +-
 .../org/apache/flink/runtime/ipc/Client.java    |  10 +-
 .../java/org/apache/flink/runtime/ipc/RPC.java  |   2 -
 .../org/apache/flink/runtime/ipc/Server.java    |   1 -
 .../flink/runtime/jobmanager/JobManager.java    |  10 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   2 +-
 .../runtime/jobmanager/web/WebInfoServer.java   |   7 +
 25 files changed, 464 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/docs/config.md
----------------------------------------------------------------------
diff --git a/docs/config.md b/docs/config.md
index 4a02b0f..47a096f 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -252,5 +252,10 @@ input format's parameters (DEFAULT: 2097152 (= 2 MiBytes)).
 
 # YARN
 
+Please note that all ports used by Flink in a YARN session are offsetted by the YARN application ID
+to avoid duplicate port allocations when running multiple YARN sessions in parallel. 
+
+So if `yarn.am.rpc.port` is configured to `10245` and the session's application ID is `application_1406629969999_0002`, then the actual port being used is 10245 + 2 = 10247
+
 - `yarn.am.rpc.port`: The port that is being opened by the Application Master (AM) to 
 let the YARN client connect for an RPC serice. (DEFAULT: Port 10245)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/docs/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/yarn_setup.md b/docs/yarn_setup.md
index dc76826..86cb6b0 100644
--- a/docs/yarn_setup.md
+++ b/docs/yarn_setup.md
@@ -4,7 +4,7 @@ title:  "YARN Setup"
 
 # In a Nutshell
 
-Start YARN session with 4 Taskmanagers (each with 4 GB of Heapspace):
+Start YARN session with 4 Task Managers (each with 4 GB of Heapspace):
 
 ```bash
 wget {{ site.FLINK_DOWNLOAD_URL_YARN_STABLE }}
@@ -13,6 +13,8 @@ cd flink-yarn-{{ site.FLINK_VERSION_STABLE }}/
 ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
 ```
 
+Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
+
 # Introducing YARN
 
 Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management framework. It allows to run various distributed applications on top of a cluster. Flink runs on YARN next to other applications. Users do not have to setup or install anything if there is already a YARN setup.
@@ -20,7 +22,7 @@ Apache [Hadoop YARN](http://hadoop.apache.org/) is a cluster resource management
 **Requirements**
 
 - Apache Hadoop 2.2
-- HDFS
+- HDFS (Hadoop Distributed File System)
 
 If you have troubles using the Flink YARN client, have a look in the [FAQ section]({{site.baseurl}}/docs/0.5/general/faq.html).
 
@@ -32,10 +34,10 @@ A session will start all required Flink services (JobManager and TaskManagers) s
 
 ### Download Flink for YARN
 
-Download the YARN tgz package on the [download page]({{site.baseurl}}/downloads/#nightly). It contains the required files.
+Download the YARN tgz package on the [download page]({{site.baseurl}}/downloads/). It contains the required files.
 
 
-If you want to build the YARN .tgz file from sources, follow the build instructions. Make sure to use the `-Dhadoop.profile=2` profile. You can find the file in `flink-dist/target/flink-dist-{{site.docs_05_stable}}-yarn.tar.gz` (*Note: The version might be different for you* ).
+If you want to build the YARN .tgz file from sources, follow the [build instructions](building.html). Make sure to use the `-Dhadoop.profile=2` profile. You can find the file in `flink-dist/target/flink-dist-{{site.docs_05_stable}}-yarn.tar.gz` (*Note: The version might be different for you* ).
 
 Extract the package using:
 
@@ -57,11 +59,13 @@ This command will show you the following overview:
 ```bash
 Usage:
    Required
-     -n,--container <arg>   Number of Yarn container to allocate (=Number of TaskTrackers)
+     -n,--container <arg>   Number of Yarn container to allocate (=Number of Task Managers)
    Optional
+     -D <arg>                       Dynamic Properties
      -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]
      -q,--query                      Display available YARN resources (memory, cores)
      -qu,--queue <arg>               Specify YARN queue.
+     -s,--slots <arg>                Number of slots per TaskManager
      -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]
      -tmc,--taskManagerCores <arg>   Virtual CPU cores per TaskManager
      -v,--verbose                    Verbose debug mode
@@ -69,17 +73,21 @@ Usage:
 
 Please note that the Client requires the `HADOOP_HOME` (or `YARN_CONF_DIR` or `HADOOP_CONF_DIR`) environment variable to be set to read the YARN and HDFS configuration.
 
-**Example:** Issue the following command to allocate 10 TaskTrackers, with 8 GB of memory each:
+**Example:** Issue the following command to allocate 10 Task Managers, with 8 GB of memory and 32 processing slots each:
 
 ```bash
-./bin/yarn-session.sh -n 10 -tm 8192
+./bin/yarn-session.sh -n 10 -tm 8192 -s 32
 ```
 
-The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something. Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines) and `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN).
+The system will use the configuration in `conf/flink-config.yaml`. Please follow our [configuration guide](config.html) if you want to change something. 
+
+Flink on YARN will overwrite the following configuration parameters `jobmanager.rpc.address` (because the JobManager is always allocated at different machines), `taskmanager.tmp.dirs` (we are using the tmp directories given by YARN) and `parallelization.degree.default` if the number of slots has been specified.
+
+If you don't want to change the configuration file to pass configuration parameters, there is the option to pass dynamic properties via the `-D` flag. So you can pass parameters this way: `-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368`.
 
-The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and JobTracker.
+The example invocation starts 11 containers, since there is one additional container for the ApplicationMaster and Job Manager.
 
-Once Flink is deployed in your YARN cluster, it will show you the connection details of the JobTracker.
+Once Flink is deployed in your YARN cluster, it will show you the connection details of the Job Manager.
 
 The client has to remain open to keep the deployment running. We suggest to use `screen`, which will start a detachable shell:
 
@@ -88,6 +96,7 @@ The client has to remain open to keep the deployment running. We suggest to use
 3. Use `CTRL+a`, then press `d` to detach the screen session,
 4. Use `screen -r` to resume again.
 
+
 # Submit Job to Flink
 
 Use the following command to submit a Flink program to the YARN cluster:
@@ -102,14 +111,22 @@ The command will show you a help menu like this:
 
 ```bash
 [...]
-Action "run" compiles and submits a Flink program.
+Action "run" compiles and runs a program.
+
+  Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action arguments:
-     -a,--arguments <programArgs>   Program arguments
-     -c,--class <classname>         Program class
-     -j,--jarfile <jarfile>         Flink program JAR file
-     -m,--jobmanager <host:port>    Jobmanager to which the program is submitted
-     -w,--wait                      Wait for program to finish
-[...]
+     -c,--class <classname>           Class with the program entry point ("main"
+                                      method or "getPlan()" method. Only needed
+                                      if the JAR file does not specify the class
+                                      in its manifest.
+     -m,--jobmanager <host:port>      Address of the JobManager (master) to
+                                      which to connect. Use this flag to connect
+                                      to a different JobManager than the one
+                                      specified in the configuration.
+     -p,--parallelism <parallelism>   The parallelism with which to run the
+                                      program. Optional flag to override the
+                                      default value specified in the
+                                      configuration
 ```
 
 Use the *run* action to submit a job to YARN. The client is able to determine the address of the JobManager. In the rare event of a problem, you can also pass the JobManager address using the `-m` argument. The JobManager address is visible in the YARN console.
@@ -135,6 +152,31 @@ You can check the number of TaskManagers in the JobManager web interface. The ad
 If the TaskManagers do not show up after a minute, you should investigate the issue using the log files.
 
 
+# Debugging a failed YARN session
+
+There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
+
+## Log Files
+
+In cases where the Flink YARN session fails during the deployment itself, users have to rely on the logging capabilities of Hadoop YARN. The most useful feature for that is the [YARN log aggregation](http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/). 
+To enable it, users have to set the `yarn.log-aggregation-enable` property to `true` in the `yarn-site.xml` file.
+Once that is enabled, users can use the following command to retrieve all log files of a (failed) YARN session.
+
+```
+yarn logs -applicationId <application ID>
+```
+
+Note that it takes a few seconds after the session has finished until the logs show up.
+
+## YARN Client console & Webinterfaces
+
+The Flink YARN client also prints error messages in the terminal if errors occur during runtime (for example if a TaskManager stops working after some time).
+
+In addition to that, there is the YARN Resource Manager webinterface (by default on port 8088). The port of the Resource Manager web interface is determined by the `yarn.resourcemanager.webapp.address` configuration value. 
+
+It allows to access log files for running YARN applications and shows diagnostics for failed apps.
+
+
 # Build YARN client for a specific Hadoop version
 
 Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](building.html) for more details.
@@ -155,6 +197,6 @@ When starting a new Flink YARN session, the client first checks if the requested
 
 The next step of the client is to request (step 2) a YARN container to start the *ApplicationMaster* (step 3). Since the client registered the configuration and jar-file as a resource for the container, the NodeManager of YARN running on that particular machine will take care of preparing the container (e.g. downloading the files). Once that has finished, the *ApplicationMaster* (AM) is started.
 
-The *JobManager* and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the *AM* container is also serving Flink's web interface.
+The *JobManager* and AM are running in the same container. Once they successfully started, the AM knows the address of the JobManager (its own host). It is generating a new Flink configuration file for the TaskManagers (so that they can connect to the JobManager). The file is also uploaded to HDFS. Additionally, the *AM* container is also serving Flink's web interface. The ports Flink is using for its services are the standard ports configured by the user + the application id as an offset. This allows users to execute multiple Flink YARN sessions in parallel.
 
 After that, the AM starts allocating the containers for Flink's TaskManagers, which will download the jar file and the modified configuration from the HDFS. Once these steps are completed, Flink is set up and ready to accept Jobs.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index a2090f1..292d59a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -15,9 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
-
 package org.apache.flink.yarn;
 
 import java.io.BufferedReader;
@@ -28,7 +25,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.PrintWriter;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.jar.JarFile;
 
 import org.apache.commons.cli.CommandLine;
@@ -45,11 +43,13 @@ import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.yarn.appMaster.ApplicationMaster;
 import org.apache.flink.yarn.rpc.YARNClientMasterProtocol.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -104,47 +104,69 @@ public class Client {
 	private static final Option VERBOSE = new Option("v","verbose",false, "Verbose debug mode");
 	private static final Option GEN_CONF = new Option("g","generateConf",false, "Place default configuration file in current directory");
 	private static final Option QUEUE = new Option("qu","queue",true, "Specify YARN queue.");
-	private static final Option SHIP_PATH = new Option("s","ship",true, "Ship files in the specified directory");
+	private static final Option SHIP_PATH = new Option("t","ship",true, "Ship files in the specified directory (t for transfer)");
 	private static final Option FLINK_CONF_DIR = new Option("c","confDir",true, "Path to Flink configuration directory");
 	private static final Option FLINK_JAR = new Option("j","jar",true, "Path to Flink jar file");
 	private static final Option JM_MEMORY = new Option("jm","jobManagerMemory",true, "Memory for JobManager Container [in MB]");
 	private static final Option TM_MEMORY = new Option("tm","taskManagerMemory",true, "Memory per TaskManager Container [in MB]");
 	private static final Option TM_CORES = new Option("tmc","taskManagerCores",true, "Virtual CPU cores per TaskManager");
 	private static final Option CONTAINER = new Option("n","container",true, "Number of Yarn container to allocate (=Number of"
-			+ " TaskTrackers)");
+			+ " Task Managers)");
+	private static final Option SLOTS = new Option("s","slots",true, "Number of slots per TaskManager");
+	/**
+	 * Dynamic properties allow the user to specify additional configuration values with -D, such as
+	 *  -Dfs.overwrite-files=true  -Dtaskmanager.network.numberOfBuffers=16368
+	 */
+	private static final Option DYNAMIC_PROPERTIES = new Option("D", true, "Dynamic properties");
 
 	/**
-	 * Constants
+	 * Constants,
+	 * all starting with ENV_ are used as environment variables to pass values from the Client
+	 * to the Application Master.
 	 */
-	// environment variable names
 	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
 	public final static String ENV_TM_CORES = "_CLIENT_TM_CORES";
 	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
 	public final static String ENV_APP_ID = "_APP_ID";
+	public final static String ENV_APP_NUMBER = "_APP_NUMBER";
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
 	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
 	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
 	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
 	public static final String ENV_AM_PRC_PORT = "_AM_PRC_PORT";
+	public static final String ENV_SLOTS = "_SLOTS";
+	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
 
 	private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
-
+	
 	/**
 	 * Seconds to wait between each status query to the AM.
 	 */
 	private static final int CLIENT_POLLING_INTERVALL = 3;
+	/**
+	 * Minimum memory requirements, checked by the Client.
+	 */
+	private static final int MIN_JM_MEMORY = 128;
+	private static final int MIN_TM_MEMORY = 128;
 
 	private Configuration conf;
 	private YarnClient yarnClient;
 
 	private ClientMasterControl cmc;
 
-	private ApplicationId appId;
-
-	private File addrFile;
+	private File yarnPropertiesFile;
 
+	/**
+	 * Files (usually in a distributed file system) used for the YARN session of Flink.
+	 * Contains configuration files and jar files.
+	 */
 	private Path sessionFilesDir;
 
+	/**
+	 * If the user has specified a different number of slots, we store them here
+	 */
+	private int slots = -1;
+	
 	public void run(String[] args) throws Exception {
 
 		if(UserGroupInformation.isSecurityEnabled()) {
@@ -167,6 +189,8 @@ public class Client {
 		options.addOption(QUEUE);
 		options.addOption(QUERY);
 		options.addOption(SHIP_PATH);
+		options.addOption(SLOTS);
+		options.addOption(DYNAMIC_PROPERTIES);
 
 		CommandLineParser parser = new PosixParser();
 		CommandLine cmd = null;
@@ -287,12 +311,31 @@ public class Client {
 		if(cmd.hasOption(JM_MEMORY.getOpt())) {
 			jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
 		}
-
+		if(jmMemory < MIN_JM_MEMORY) {
+			System.out.println("The JobManager memory is below the minimum required memory amount "
+					+ "of "+MIN_JM_MEMORY+" MB");
+			System.exit(1);
+		}
 		// Task Managers memory
 		int tmMemory = 1024;
 		if(cmd.hasOption(TM_MEMORY.getOpt())) {
 			tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
 		}
+		if(tmMemory < MIN_TM_MEMORY) {
+			System.out.println("The TaskManager memory is below the minimum required memory amount "
+					+ "of "+MIN_TM_MEMORY+" MB");
+			System.exit(1);
+		}
+		
+		if(cmd.hasOption(SLOTS.getOpt())) {
+			slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
+		}
+		
+		String[] dynamicProperties = null;
+		if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
+		}
+		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		// Task Managers vcores
 		int tmCores = 1;
@@ -305,6 +348,7 @@ public class Client {
 			LOG.warn("Unable to find job manager port in configuration!");
 			jmPort = ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT;
 		}
+		
 		conf = Utils.initializeYarnConfiguration();
 
 		// intialize HDFS
@@ -320,7 +364,7 @@ public class Client {
 		}
 
 		// Create yarnClient
-		final YarnClient yarnClient = YarnClient.createYarnClient();
+		yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();
 
@@ -395,7 +439,7 @@ public class Client {
 		if(hasLog4j) {
 			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 		}
-		amCommand 	+= " org.apache.flink.yarn.appMaster.ApplicationMaster" + " "
+		amCommand 	+= " "+ApplicationMaster.class.getName()+" "
 					+ " 1>"
 					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
 					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
@@ -406,7 +450,15 @@ public class Client {
 		// Set-up ApplicationSubmissionContext for the application
 		ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
 		final ApplicationId appId = appContext.getApplicationId();
-
+		/**
+		 * All network ports are offsetted by the application number 
+		 * to avoid version port clashes when running multiple Flink sessions
+		 * in parallel
+		 */
+		int appNumber = appId.getId();
+
+		jmPort += appNumber;
+				
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
@@ -447,6 +499,7 @@ public class Client {
 		fs.close();
 
 		int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
+		amRPCPort += appNumber;
 		// Setup CLASSPATH for ApplicationMaster
 		Map<String, String> appMasterEnv = new HashMap<String, String>();
 		Utils.setupEnv(conf, appMasterEnv);
@@ -460,6 +513,11 @@ public class Client {
 		appMasterEnv.put(Client.ENV_CLIENT_SHIP_FILES, envShipFileList.toString() );
 		appMasterEnv.put(Client.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
 		appMasterEnv.put(Client.ENV_AM_PRC_PORT, String.valueOf(amRPCPort));
+		appMasterEnv.put(Client.ENV_SLOTS, String.valueOf(slots));
+		appMasterEnv.put(Client.ENV_APP_NUMBER, String.valueOf(appNumber));
+		if(dynamicPropertiesEncoded != null) {
+			appMasterEnv.put(Client.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
 
 		amContainer.setEnvironment(appMasterEnv);
 
@@ -473,8 +531,8 @@ public class Client {
 		appContext.setResource(capability);
 		appContext.setQueue(queue);
 
-		// file that we write into the conf/ dir containing the jobManager address.
-		addrFile = new File(confDirPath + CliFrontend.JOBMANAGER_ADDRESS_FILE);
+		// file that we write into the conf/ dir containing the jobManager address and the dop.
+		yarnPropertiesFile = new File(confDirPath + CliFrontend.YARN_PROPERTIES_FILE);
 
 
 		LOG.info("Submitting application master " + appId);
@@ -485,6 +543,7 @@ public class Client {
 		char[] el = { '/', '|', '\\', '-'};
 		int i = 0;
 		int numTaskmanagers = 0;
+		int numMessages = 0;
 
 		BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
 
@@ -495,10 +554,19 @@ public class Client {
 				System.err.println("Flink JobManager is now running on "+appReport.getHost()+":"+jmPort);
 				System.err.println("JobManager Web Interface: "+appReport.getTrackingUrl());
 				// write jobmanager connect information
-				PrintWriter out = new PrintWriter(addrFile);
-				out.println(appReport.getHost()+":"+jmPort);
+				Properties yarnProps = new Properties();
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, appReport.getHost()+":"+jmPort);
+				if(slots != -1) {
+					yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(slots * taskManagerCount) );
+				}
+				// add dynamic properties
+				if(dynamicProperties != null) {
+					yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicPropertiesEncoded);
+				}
+				OutputStream out = new FileOutputStream(yarnPropertiesFile);
+				yarnProps.store(out, "Generated YARN properties file");
 				out.close();
-				addrFile.setReadable(true, false); // readable for all.
+				yarnPropertiesFile.setReadable(true, false); // readable for all.
 
 				// connect RPC service
 				cmc = new ClientMasterControl(new InetSocketAddress(appReport.getHost(), amRPCPort));
@@ -515,26 +583,34 @@ public class Client {
 			} else {
 				int newTmCount = cmc.getNumberOfTaskManagers();
 				if(numTaskmanagers != newTmCount) {
-					System.err.println("Number of connected TaskManagers changed to "+newTmCount+" slots available: "+cmc.getNumberOfAvailableSlots());
+					System.err.println("Number of connected TaskManagers changed to "+newTmCount+". "
+							+ "Slots available: "+cmc.getNumberOfAvailableSlots());
 					numTaskmanagers = newTmCount;
 				}
+				// we also need to show new messages.
 				if(cmc.getFailedStatus()) {
 					System.err.println("The Application Master failed!\nMessages:\n");
 					for(Message m: cmc.getMessages() ) {
-						System.err.println("Message: "+m.text);
+						System.err.println("Message: "+m.getMessage());
 					}
 					System.err.println("Requesting Application Master shutdown");
 					cmc.shutdownAM();
+					cmc.close();
 					System.err.println("Application Master closed.");
 				}
-				for(Message m: cmc.getMessages() ) {
-					System.err.println("Message: "+m.text);
+				if(cmc.getMessages().size() != numMessages) {
+					System.err.println("Received new message(s) from the Application Master");
+					List<Message> msg = cmc.getMessages();
+					while(msg.size() > numMessages) {
+						System.err.println("Message: "+msg.get(numMessages).getMessage());
+						numMessages++;
+					}
 				}
 
 				// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
 				long startTime = System.currentTimeMillis();
 				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-				        && !in.ready()) {
+						&& !in.ready()) {
 					Thread.sleep(200);
 				}
 				if (in.ready()) {
@@ -549,10 +625,15 @@ public class Client {
 		}
 
 		LOG.info("Application " + appId + " finished with"
-				+ " state " + appState + "and final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+				+ " state " + appState + " and "
+				+ "final state " + appReport.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
 
 		if(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED ) {
 			LOG.warn("Application failed. Diagnostics "+appReport.getDiagnostics());
+			LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retreive "
+					+ "the full application log using this command:\n"
+					+ "\tyarn logs -applicationId "+appReport.getApplicationId()+"\n"
+					+ "(It sometimes takes a few seconds until the logs are aggregated)");
 		}
 
 	}
@@ -560,8 +641,6 @@ public class Client {
 	private void printHelp() {
 		System.err.println("Available commands:\n"
 				+ "\t stop : Stop the YARN session\n"
-			//	+ "\t add n : Add n TaskManagers to the YARN session\n"
-			//	+ "\t remove n : Remove n TaskManagers to the YARN session\n"
 				+ "\t allmsg : Show all messages\n");
 	}
 	private void evalCommand(String command) {
@@ -573,34 +652,38 @@ public class Client {
 		} else if(command.equals("allmsg")) {
 			System.err.println("All messages from the ApplicationMaster:");
 			for(Message m: cmc.getMessages() ) {
-				System.err.println("Message: "+m.text);
+				System.err.println("Message: "+m.getMessage());
 			}
 		} else if(command.startsWith("add")) {
-			String nStr = command.replace("add", "").trim();
-			int n = Integer.valueOf(nStr);
-			System.err.println("Adding "+n+" TaskManagers to the session");
-			cmc.addTaskManagers(n);
+			System.err.println("This feature is not implemented yet!");
+//			String nStr = command.replace("add", "").trim();
+//			int n = Integer.valueOf(nStr);
+//			System.err.println("Adding "+n+" TaskManagers to the session");
+//			cmc.addTaskManagers(n);
 		} else {
 			System.err.println("Unknown command '"+command+"'");
 			printHelp();
 		}
 	}
 
+	private void cleanUp() throws IOException {
+		LOG.info("Deleting files in "+sessionFilesDir );
+		FileSystem shutFS = FileSystem.get(conf);
+		shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
+		shutFS.close();
+	}
+	
 	private void stopSession() {
 		try {
 			LOG.info("Sending shutdown request to the Application Master");
 			cmc.shutdownAM();
-			yarnClient.killApplication(appId);
-			LOG.info("Deleting files in "+sessionFilesDir );
-			FileSystem shutFS = FileSystem.get(conf);
-			shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-			shutFS.close();
+			cleanUp();
 			cmc.close();
 		} catch (Exception e) {
 			LOG.warn("Exception while killing the YARN application", e);
 		}
 		try {
-			addrFile.delete();
+			yarnPropertiesFile.delete();
 		} catch (Exception e) {
 			LOG.warn("Exception while deleting the JobManager address file", e);
 		}
@@ -653,6 +736,8 @@ public class Client {
 		opt.addOption(TM_CORES);
 		opt.addOption(QUERY);
 		opt.addOption(QUEUE);
+		opt.addOption(SLOTS);
+		opt.addOption(DYNAMIC_PROPERTIES);
 		formatter.printHelp(" ", opt);
 	}
 
@@ -722,6 +807,4 @@ public class Client {
 		Client c = new Client();
 		c.run(args);
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
index 44633ea..1c87289 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/ClientMasterControl.java
@@ -40,7 +40,7 @@ public class ClientMasterControl extends Thread {
 	private ApplicationMasterStatus appMasterStatus;
 	private YARNClientMasterProtocol cmp;
 	private Object lock = new Object();
-	private List<Message> messages;
+	private List<Message> messages = new ArrayList<Message>();
 	private boolean running = true;
 
 	public ClientMasterControl(InetSocketAddress applicationMasterAddress) {
@@ -55,9 +55,13 @@ public class ClientMasterControl extends Thread {
 
 			while(running) {
 				synchronized (lock) {
-					appMasterStatus = cmp.getAppplicationMasterStatus();
-					if(messages != null && appMasterStatus != null &&
-							messages.size() != appMasterStatus.getMessageCount()) {
+					try {
+						appMasterStatus = cmp.getAppplicationMasterStatus();
+					} catch(Throwable e) {
+						// TODO: try to clean up as much as possible! (set to failed state? // kill app? // clean up files)
+						LOG.warn("Failed to get Application Master status", e);
+					}
+					if(appMasterStatus != null && messages.size() != appMasterStatus.getMessageCount()) {
 						messages = cmp.getMessages();
 					}
 				}
@@ -104,7 +108,8 @@ public class ClientMasterControl extends Thread {
 
 	public boolean shutdownAM() {
 		try {
-			return cmp.shutdownAM().getValue();
+			boolean result = cmp.shutdownAM().getValue();
+			return result;
 		} catch(Throwable e) {
 			LOG.warn("Error shutting down the application master", e);
 			return false;
@@ -112,9 +117,6 @@ public class ClientMasterControl extends Thread {
 	}
 
 	public List<Message> getMessages() {
-		if(this.messages == null) {
-			return new ArrayList<Message>();
-		}
 		return this.messages;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
index 2186fca..30d2f30 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/ApplicationMaster.java
@@ -36,6 +36,8 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.ipc.RPC;
@@ -81,7 +83,8 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	private final String currDir;
 	private final String logDirs;
 	private final String ownHostname;
-	private final String appId;
+	private final String appId; // YARN style application id, for example: application_1406629969999_0002
+	private final int appNumber; // app number, for example 2 (see above)
 	private final String clientHomeDir;
 	private final String applicationMasterHost;
 	private final String remoteFlinkJarPath;
@@ -91,8 +94,10 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	private final int taskManagerCount;
 	private final int memoryPerTaskManager;
 	private final int coresPerTaskManager;
+	private final int slots;
 	private final String localWebInterfaceDir;
-	private final Configuration conf;
+	private final Configuration conf; // Hadoop!! configuration.
+	
 
 	/**
 	 * File system for interacting with Flink's files such as the jar
@@ -153,6 +158,21 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	 * that the client can still retrieve the messages and then shut it down)
 	 */
 	private Boolean isFailed = false;
+	private boolean isClosed = false;
+
+	private String dynamicPropertiesEncodedString;
+	
+	/**
+	 * AM status that is send to the Client periodically
+	 */
+	private ApplicationMasterStatus amStatus;
+
+	/**
+	 * The JobManager's port, offsetted by the appNumber.
+	 */
+	private final int jobManagerPort;
+	private final int jobManagerWebPort;
+	
 
 	public ApplicationMaster(Configuration conf) throws IOException {
 		fs = FileSystem.get(conf);
@@ -161,15 +181,19 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		logDirs =  envs.get(Environment.LOG_DIRS.key());
 		ownHostname = envs.get(Environment.NM_HOST.key());
 		appId = envs.get(Client.ENV_APP_ID);
+		appNumber = Integer.valueOf(envs.get(Client.ENV_APP_NUMBER));
 		clientHomeDir = envs.get(Client.ENV_CLIENT_HOME_DIR);
 		applicationMasterHost = envs.get(Environment.NM_HOST.key());
 		remoteFlinkJarPath = envs.get(Client.FLINK_JAR_PATH);
 		shipListString = envs.get(Client.ENV_CLIENT_SHIP_FILES);
 		yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
-		rpcPort = envs.get(Client.ENV_AM_PRC_PORT);
+		rpcPort = envs.get(Client.ENV_AM_PRC_PORT); // already offsetted
 		taskManagerCount = Integer.valueOf(envs.get(Client.ENV_TM_COUNT));
 		memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
 		coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
+		slots = Integer.valueOf(envs.get(Client.ENV_SLOTS));
+		dynamicPropertiesEncodedString = envs.get(Client.ENV_DYNAMIC_PROPERTIES); // might return null!
+		
 		localWebInterfaceDir = currDir+"/resources/"+ConfigConstants.DEFAULT_JOB_MANAGER_WEB_PATH_NAME;
 		this.conf = conf;
 
@@ -179,7 +203,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		if(ownHostname == null) {
 			throw new RuntimeException("Own hostname ("+Environment.NM_HOST+") not set.");
 		}
-		LOG.info("Working directory "+currDir);
+		LOG.debug("Working directory "+currDir);
 
 		// load Flink configuration.
 		Utils.getFlinkConfiguration(currDir);
@@ -187,6 +211,16 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		// start AM RPC service
 		amRpcServer = RPC.getServer(this, ownHostname, Integer.valueOf(rpcPort), 2);
 		amRpcServer.start();
+		
+		// determine JobManager port
+		int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+		if(port != -1) {
+			port += appNumber;
+		} else {
+			LOG.warn("JobManager port is unknown");
+		}
+		this.jobManagerPort = port;
+		this.jobManagerWebPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)+appNumber;
 	}
 	
 	private void setFailed(boolean failed) {
@@ -210,8 +244,26 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		}
 		// just to make sure.
 		output.append(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY+": "+ownHostname+"\n");
+		output.append(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY+": "+jobManagerPort+"\n"); // already offsetted here.
 		output.append(ConfigConstants.JOB_MANAGER_WEB_ROOT_PATH_KEY+": "+localWebInterfaceDir+"\n");
 		output.append(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY+": "+logDirs+"\n");
+		
+		output.append(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY+": "+ jobManagerWebPort +"\n");
+		
+		
+		if(slots != -1) {
+			// configure slots and default dop
+			output.append(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS+": "+slots+"\n");
+			output.append(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY+": "+slots*taskManagerCount+"\n");
+		}
+		// add dynamic properties
+		List<Tuple2<String, String>> dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString);
+		for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
+			String propLine = dynamicProperty.f0+": "+dynamicProperty.f1;
+			output.append(propLine+"\n");
+			LOG.debug("Adding user-supplied configuration value to generated configuration file: "+propLine);
+		}
+		
 		output.close();
 		br.close();
 		File newConf = new File(currDir+"/flink-conf-modified.yaml");
@@ -245,10 +297,11 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 		nmClient.init(conf);
 		nmClient.start();
 		nmClient.cleanupRunningContainersOnStop(true);
-
+		
 		// Register with ResourceManager
-		LOG.info("Registering ApplicationMaster");
-		rmClient.registerApplicationMaster(applicationMasterHost, 0, "http://"+applicationMasterHost+":"+GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, "undefined"));
+		String url = "http://"+applicationMasterHost+":"+jobManagerWebPort;
+		LOG.info("Registering ApplicationMaster with tracking url "+url);
+		rmClient.registerApplicationMaster(applicationMasterHost, 0, url);
 
 		// Priority for worker containers - priorities are intra-application
 		Priority priority = Records.newRecord(Priority.class);
@@ -329,15 +382,17 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 			}
 			Thread.sleep(5000);
 		}
-		LOG.info("Shutting down JobManager");
-		jobManager.shutdown();
-
+		if(isClosed) {
+			return;
+		}
 		// Un-register with ResourceManager
 		final String diagnosticsMessage = "Application Master shut down after all "
 				+ "containers finished\n"+containerDiag.toString();
 		LOG.info("Diagnostics message: "+diagnosticsMessage);
 		rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, diagnosticsMessage, "");
 		this.close();
+		amRpcServer.stop(); // we need to manually stop the RPC service. Usually, the Client stops the RPC,
+		// but at this point, the AM has been shut down (for some reason).
 		LOG.info("Application Master shutdown completed.");
 	}
 
@@ -364,7 +419,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 				if(hasLog4j) {
 					tmCommand += " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/taskmanager-log4j.log\" -Dlog4j.configuration=file:log4j.properties";
 				}
-				tmCommand	+= " org.apache.flink.appMaster.YarnTaskManagerRunner -configDir . "
+				tmCommand	+= " "+YarnTaskManagerRunner.class.getName()+" -configDir . "
 						+ " 1>"
 						+ ApplicationConstants.LOG_DIR_EXPANSION_VAR
 						+ "/taskmanager-stdout.log" 
@@ -398,7 +453,6 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 
 				LOG.info("Launching container " + allocatedContainers);
 				nmClient.startContainer(container, ctx);
-				messages.add(new Message("Launching new container"));
 			}
 			for (ContainerStatus status : response.getCompletedContainersStatuses()) {
 				++completedContainers;
@@ -422,12 +476,16 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	
 	@Override
 	public ApplicationMasterStatus getAppplicationMasterStatus() {
-		ApplicationMasterStatus amStatus;
+		if(amStatus == null) {
+			amStatus = new ApplicationMasterStatus();
+		}
 		if(jobManager == null) {
 			// JM not yet started
-			amStatus = new ApplicationMasterStatus(0, 0 );
+			amStatus.setNumTaskManagers(0);
+			amStatus.setNumSlots(0);
 		} else {
-			amStatus = new ApplicationMasterStatus(jobManager.getNumberOfTaskManagers(), jobManager.getAvailableSlots() );
+			amStatus.setNumTaskManagers(jobManager.getNumberOfTaskManagers());
+			amStatus.setNumSlots(jobManager.getAvailableSlots());
 		}
 		amStatus.setMessageCount(messages.size());
 		amStatus.setFailed(isFailed);
@@ -452,12 +510,17 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
 	}
 	
 	private void close() throws Exception {
-		nmClient.close();
-		rmClient.close();
-		if(!isFailed) {
-			LOG.warn("Can not close AM RPC connection since this the AM is in failed state");
-			amRpcServer.stop();
+		if(!isClosed) {
+			jobManager.shutdown();
+			nmClient.close();
+			rmClient.close();
+			if(!isFailed) {
+			//	amRpcServer.stop();
+			} else {
+				LOG.warn("Can not close AM RPC connection since the AM is in failed state");
+			}
 		}
+		this.isClosed = true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
index b2bdf6b..39ef31a 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/ApplicationMasterStatus.java
@@ -71,6 +71,15 @@ public class ApplicationMasterStatus implements IOReadableWritable {
 		this.failed = isFailed;
 	}
 
+	public void setNumTaskManagers(int num) {
+		this.numTaskManagers = num;
+	}
+	
+	public void setNumSlots(int slots) {
+		this.numSlots = slots;
+	}
+	
+	
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeInt(numTaskManagers);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
index 560147c..670a931 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/rpc/YARNClientMasterProtocol.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn.rpc;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 
 import org.apache.flink.core.io.IOReadableWritable;
@@ -34,21 +35,37 @@ import org.apache.flink.types.BooleanValue;
  */
 public interface YARNClientMasterProtocol extends VersionedProtocol {
 
+	/**
+	 * Message from Am to Client.
+	 *
+	 */
 	public static class Message implements IOReadableWritable {
-		public String text;
-
+		private String text;
+		private Date date;
+		
+		public Message() {	
+			// for deserializability
+		}
+		
 		public Message(String msg) {
 			this.text = msg;
+			this.date = new Date();
+		}
+		
+		public String getMessage() {
+			return "["+date+"] "+text;
 		}
 
 		@Override
 		public void write(DataOutputView out) throws IOException {
 			out.writeUTF(text);
+			out.writeLong(date.getTime());
 		}
 
 		@Override
 		public void read(DataInputView in) throws IOException {
 			text = in.readUTF();
+			date = new Date(in.readLong());
 		}
 	}
 
@@ -59,5 +76,4 @@ public interface YARNClientMasterProtocol extends VersionedProtocol {
 	List<Message> getMessages();
 
 	void addTaskManagers(int n);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e516255..d15b0d0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -20,7 +20,9 @@
 package org.apache.flink.client;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -40,9 +43,9 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.cli.UnrecognizedOptionException;
-import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -109,7 +112,15 @@ public class CliFrontend {
 	private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
 	private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
 	
-	public static final String JOBMANAGER_ADDRESS_FILE = ".yarn-jobmanager";
+	/**
+	 * YARN-session related constants
+	 */
+	public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
+	public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
+	public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
+	public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
+	// this has to be a regex for String.split()
+	public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
 	
 
 	private CommandLineParser parser;
@@ -118,6 +129,10 @@ public class CliFrontend {
 	private boolean printHelp;
 	
 	private boolean globalConfigurationLoaded;
+	
+	private boolean yarnPropertiesLoaded = false;
+	
+	private Properties yarnProperties;
 
 	/**
 	 * Initializes the class
@@ -723,24 +738,19 @@ public class CliFrontend {
 			}
 		}
 		else {
-			// second, search for a .yarn-jobmanager file
-			String loc = getConfigurationDirectory();
-			File jmAddressFile = new File(loc + '/' + JOBMANAGER_ADDRESS_FILE);
-			
-			if (jmAddressFile.exists()) {
+			Properties yarnProps = getYarnProperties();
+			if(yarnProps != null) {
 				try {
-					String address = FileUtils.readFileToString(jmAddressFile).trim();
-					System.out.println("Found a " + JOBMANAGER_ADDRESS_FILE + " file, using \""+address+"\" to connect to the JobManager");
-					
+					String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+					System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, "
+							+ "using \""+address+"\" to connect to the JobManager");
 					return RemoteExecutor.getInetFromHostport(address);
-				}
-				catch (Exception e) {
-					System.out.println("Found a " + JOBMANAGER_ADDRESS_FILE + " file, but could not read the JobManager address from the file. " 
+				} catch (Exception e) {
+					System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " 
 								+ e.getMessage());
 					return null;
 				}
-			}
-			else {
+			} else {
 				// regular config file gives the address
 				String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
 				
@@ -809,11 +819,72 @@ public class CliFrontend {
 		if (!globalConfigurationLoaded) {
 			String location = getConfigurationDirectory();
 			GlobalConfiguration.loadConfiguration(location);
+			// set default parallelization degree
+			Properties yarnProps;
+			try {
+				yarnProps = getYarnProperties();
+				if(yarnProps != null) {
+					String propDegree = yarnProps.getProperty(YARN_PROPERTIES_DOP);
+					int paraDegree = -1;
+					if(propDegree != null) { // maybe the property is not set
+						paraDegree = Integer.valueOf(propDegree);
+					}
+					Configuration c = GlobalConfiguration.getConfiguration();
+					if(paraDegree != -1) {
+						c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
+					}
+					// handle the YARN client's dynamic properties
+					String dynamicPropertiesEncoded = yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+					List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
+					for(Tuple2<String, String> dynamicProperty : dynamicProperties) {
+						c.setString(dynamicProperty.f0, dynamicProperty.f1);
+					}
+					GlobalConfiguration.includeConfiguration(c); // update config
+				}
+			} catch (IOException e) {
+				System.err.println("Error while loading YARN properties: "+e.getMessage());
+				e.printStackTrace();
+			}
+			
 			globalConfigurationLoaded = true;
 		}
 		return GlobalConfiguration.getConfiguration();
 	}
 	
+	public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) {
+		List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>();
+		if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+			String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+			for(String propLine : propertyLines) {
+				if(propLine == null) {
+					continue;
+				}
+				String[] kv = propLine.split("=");
+				if(kv != null && kv[0] != null && kv[1] != null && kv[0].length() > 0) {
+					ret.add(new Tuple2<String, String>(kv[0], kv[1]));
+				}
+			}
+		}
+		return ret;
+	}
+	
+	protected Properties getYarnProperties() throws IOException {
+		if(!yarnPropertiesLoaded) {
+			String loc = getConfigurationDirectory();
+			File propertiesFile = new File(loc + '/' + YARN_PROPERTIES_FILE);
+			if (propertiesFile.exists()) {
+				Properties props = new Properties();
+				InputStream is = new FileInputStream( propertiesFile );
+				props.load(is);
+				yarnProperties = props;
+				is.close();
+			} else {
+				yarnProperties = null;
+			}
+		}
+		return yarnProperties;
+	}
+	
 	protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException {
 		return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
deleted file mode 100644
index 6dd40ef..0000000
--- a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-jobmanager
+++ /dev/null
@@ -1 +0,0 @@
-some-invalid-string

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
new file mode 100644
index 0000000..6dd40ef
--- /dev/null
+++ b/flink-clients/src/test/resources/testconfigwithinvalidyarn/.yarn-properties
@@ -0,0 +1 @@
+some-invalid-string

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
deleted file mode 100644
index d2ce743..0000000
--- a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-jobmanager
+++ /dev/null
@@ -1 +0,0 @@
-22.33.44.55:6655

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
new file mode 100644
index 0000000..e2442b7
--- /dev/null
+++ b/flink-clients/src/test/resources/testconfigwithyarn/.yarn-properties
@@ -0,0 +1,3 @@
+#Generated YARN properties file
+#Tue Jul 29 11:40:48 CEST 2014
+jobManager=22.33.44.55\:6655

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/analyze.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/analyze.html b/flink-runtime/resources/web-docs-infoserver/analyze.html
index cf1b09e..c7d5fe4 100755
--- a/flink-runtime/resources/web-docs-infoserver/analyze.html
+++ b/flink-runtime/resources/web-docs-infoserver/analyze.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -61,7 +61,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/blank-page.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/blank-page.html b/flink-runtime/resources/web-docs-infoserver/blank-page.html
index ef03e42..4cddd14 100755
--- a/flink-runtime/resources/web-docs-infoserver/blank-page.html
+++ b/flink-runtime/resources/web-docs-infoserver/blank-page.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -53,7 +53,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/configuration.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/configuration.html b/flink-runtime/resources/web-docs-infoserver/configuration.html
index 06ea81a..84a4043 100755
--- a/flink-runtime/resources/web-docs-infoserver/configuration.html
+++ b/flink-runtime/resources/web-docs-infoserver/configuration.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/history.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/history.html b/flink-runtime/resources/web-docs-infoserver/history.html
index 3041837..8e94317 100755
--- a/flink-runtime/resources/web-docs-infoserver/history.html
+++ b/flink-runtime/resources/web-docs-infoserver/history.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -78,7 +78,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>History <small>Overview about recent jobs</small></h1>
+            <h1>History <small>Overview over recent jobs</small></h1>
             <ol class="breadcrumb">
               <li><a href="index.html"><i class="icon-dashboard"></i> Dashboard</a></li>
               <li class="active"><i class="fa fa-bar-chart-o"></i> History</li>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/index.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/index.html b/flink-runtime/resources/web-docs-infoserver/index.html
index be89945..f9d9e56 100755
--- a/flink-runtime/resources/web-docs-infoserver/index.html
+++ b/flink-runtime/resources/web-docs-infoserver/index.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -67,7 +67,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -91,7 +91,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>Dashboard Flink<small id="version"></small></h1>
+            <h1>Apache Flink Dashboard<small id="version"></small></h1>
             <ol class="breadcrumb">
               <li class="active"><i class="fa fa-dashboard"></i> Dashboard</li>
             </ol>
@@ -99,7 +99,7 @@
         </div><!-- /.row -->
 
         <div class="row">
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-info">
               <div class="panel-heading">
                 <div class="row">
@@ -114,7 +114,22 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+           <div class="col-lg-2">
+            <div class="panel panel-primary">
+              <div class="panel-heading">
+                <div class="row">
+                  <div class="col-xs-6">
+                    <i class="fa fa-list-ol fa-5x"></i>
+                  </div>
+                  <div class="col-xs-6 text-right">
+                    <p class="announcement-heading"><span id="stat-slots" class="stats"></span></p>
+                    <p class="announcement-text">Processing Slots</p>
+                  </div>
+                </div>
+              </div>
+            </div>
+          </div>
+          <div class="col-lg-2">
             <div class="panel panel-success">
               <div class="panel-heading">
                 <div class="row">
@@ -129,7 +144,7 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-warning">
               <div class="panel-heading">
                 <div class="row">
@@ -144,7 +159,7 @@
               </div>
             </div>
           </div>
-          <div class="col-lg-3">
+          <div class="col-lg-2">
             <div class="panel panel-danger">
               <div class="panel-heading">
                 <div class="row">
@@ -159,6 +174,9 @@
               </div>
             </div>
           </div>
+          <div class="col-lg-2">
+            <!-- empty -->
+          </div>
         </div><!-- /.row -->
 
         <div class="row">

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js b/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
index 68d178b..5daabe8 100644
--- a/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
+++ b/flink-runtime/resources/web-docs-infoserver/js/jobmanagerFrontend.js
@@ -86,6 +86,7 @@ function poll(jobId) {
 	$.ajax({ url : "jobsInfo?get=taskmanagers", cache: false, type : "GET",
 	    success : function(json) {
 		$("#stat-taskmanagers").html(json.taskmanagers);
+		$("#stat-slots").html(json.slots);
 	    }, dataType : "json",
 	});
 })();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
----------------------------------------------------------------------
diff --git a/flink-runtime/resources/web-docs-infoserver/taskmanagers.html b/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
index ce7f7f0..0fd9f50 100755
--- a/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
+++ b/flink-runtime/resources/web-docs-infoserver/taskmanagers.html
@@ -6,7 +6,7 @@
     <meta name="description" content="">
     <meta name="author" content="">
 
-    <title>Dashboard - Flink</title>
+    <title>Dashboard - Apache Flink</title>
 
     <!-- Bootstrap core CSS -->
     <link href="css/bootstrap.css" rel="stylesheet">
@@ -54,7 +54,7 @@
             <span class="icon-bar"></span>
             <span class="icon-bar"></span>
           </button>
-          <a class="navbar-brand" href="index.html">Flink</a>
+          <a class="navbar-brand" href="index.html">Apache Flink</a>
         </div>
 	 
         <!-- Collect the nav links, forms, and other content for toggling -->
@@ -78,7 +78,7 @@
 
         <div class="row">
           <div class="col-lg-12">
-            <h1>Task Managers <small>Overview about connected Task Managers</small></h1>
+            <h1>Task Managers <small>Overview over connected Task Managers</small></h1>
             <ol class="breadcrumb">
               <li><a href="index.html"><i class="icon-dashboard"></i> Dashboard</a></li>
               <li class="active"><i class="icon-file-alt"></i> Task Managers</li>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
index 8f537de..346b79a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
@@ -497,7 +497,6 @@ public class Client {
 				return;
 			}
 			touch();
-
 			try {
 				int id = in.readInt(); // try to read an id
 
@@ -521,8 +520,12 @@ public class Client {
 							LOG.error(e);
 						} catch (IllegalAccessException e) {
 							LOG.error(e);
+						} 
+						try {
+							value.read(new InputViewDataInputStreamWrapper(in)); // read value
+						} catch(Throwable e) {
+							LOG.error("Exception while receiving an RPC call", e);
 						}
-						value.read(new InputViewDataInputStreamWrapper(in)); // read value
 					}
 					call.setValue(value);
 				} else if (state == Status.ERROR.state) {
@@ -532,6 +535,9 @@ public class Client {
 					markClosed(new RemoteException(StringRecord.readString(in), StringRecord.readString(in)));
 				}
 			} catch (IOException e) {
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("Closing RPC connection due to exception", e);
+				}
 				markClosed(e);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
index ea2fa1d..efeeadc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
@@ -420,9 +420,7 @@ public class RPC {
 				method.setAccessible(true);
 
 				final Object value = method.invoke((Object) instance, (Object[]) call.getParameters());
-
 				return (IOReadableWritable) value;
-
 			} catch (InvocationTargetException e) {
 				
 				final Throwable target = e.getTargetException();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
index 115722f..a8c970c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
@@ -956,7 +956,6 @@ public abstract class Server {
 					CurCall.set(call);
 
 					value = call(call.connection.protocol, call.param, call.timestamp);
-
 					CurCall.set(null);
 
 					setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 82f663c..71957ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -250,7 +250,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	}
 
 	public void shutdown() {
-
+		LOG.debug("JobManager shutdown requested");
 		if (!this.isShutdownInProgress.compareAndSet(false, true)) {
 			return;
 		}
@@ -289,6 +289,14 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		if (this.scheduler != null) {
 			this.scheduler.shutdown();
 		}
+		
+		if(server != null) {
+			try {
+				server.stop();
+			} catch (Exception e) {
+				LOG.error("Error while shutting down the JobManager's webserver", e);
+			}
+		}
 
 		this.isShutDown = true;
 		LOG.debug("Shutdown of job manager completed");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 3ffe5af..2f738a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 				writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
 			}
 			else if("taskmanagers".equals(req.getParameter("get"))) {
-				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
+				resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getAvailableSlots()+"}");
 			}
 			else if("cancel".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e8f2e9d0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 7856652..933e49d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -191,4 +191,11 @@ public class WebInfoServer {
 		server.start();
 	}
 
+	/**
+	 * Stop the webserver
+	 */
+	public void stop() throws Exception {
+		server.stop();
+	}
+
 }