You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/10/08 14:23:59 UTC

[3/4] flink git commit: [FLINK-2790] [yarn] [ha] Add high availability support for Yarn

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 5c5a465..c7f7698 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -100,36 +100,14 @@ class ForkableFlinkMiniCluster(
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 
-    val (executionContext,
-      instanceManager,
-      scheduler,
-      libraryCacheManager,
-      _,
-      executionRetries,
-      delayBetweenRetries,
-      timeout,
-      archiveCount,
-      leaderElectionService) = JobManager.createJobManagerComponents(config)
-      val testArchiveProps = Props(
-        new TestingMemoryArchivist(archiveCount))
-
-    val archiver = actorSystem.actorOf(testArchiveProps, archiveName)
-
-    val jobManagerProps = Props(
-      new TestingJobManager(
-        configuration,
-        executionContext,
-        instanceManager,
-        scheduler,
-        libraryCacheManager,
-        archiver,
-        executionRetries,
-        delayBetweenRetries,
-        timeout,
-        streamingMode,
-        leaderElectionService))
-
-    val jobManager = actorSystem.actorOf(jobManagerProps, jobManagerName)
+    val (jobManager, _) = JobManager.startJobManagerActors(
+      config,
+      actorSystem,
+      Some(jobManagerName),
+      Some(archiveName),
+      streamingMode,
+      classOf[TestingJobManager],
+      classOf[TestingMemoryArchivist])
 
     jobManager
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index affe134..7e16baf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -125,7 +126,12 @@ public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
 			jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.STREAMING)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				jmConfig,
+				jmActorSystem,
+				StreamingMode.STREAMING,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 
 			// the TaskManager java command
 			String[] command = new String[] {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ac308dd..945a78c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.util.NetUtils;
@@ -100,7 +101,12 @@ public class ProcessFailureCancelingITCase {
 			jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
 
 			jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
-			ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.BATCH_ONLY)._1();
+			ActorRef jmActor = JobManager.startJobManagerActors(
+				jmConfig,
+				jmActorSystem,
+				StreamingMode.BATCH_ONLY,
+				JobManager.class,
+				MemoryArchivist.class)._1();
 
 			// the TaskManager java command
 			String[] command = new String[] {

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 2444ee4..2d251d9 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -101,6 +101,11 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<scope>compile</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
@@ -177,6 +182,140 @@ under the License.
 				</executions>
 			</plugin>
 
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+
 		</plugins>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
new file mode 100644
index 0000000..fb644c3
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlinkYarnSessionCliTest {
+
+	@Rule
+	public TemporaryFolder tmp = new TemporaryFolder();
+
+	@Test
+	public void testDynamicProperties() throws IOException {
+
+		Map<String, String> map = new HashMap<String, String>(System.getenv());
+		File tmpFolder = tmp.newFolder();
+		File fakeConf = new File(tmpFolder, "flink-conf.yaml");
+		fakeConf.createNewFile();
+		map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+		TestBaseUtils.setEnv(map);
+		Options options = new Options();
+		FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+		cli.getYARNSessionCLIOptions(options);
+
+		CommandLineParser parser = new PosixParser();
+		CommandLine cmd = null;
+		try {
+			cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
+		} catch(Exception e) {
+			e.printStackTrace();
+			Assert.fail("Parsing failed with " + e.getMessage());
+		}
+
+		AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
+
+		Assert.assertNotNull(flinkYarnClient);
+
+		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+		Assert.assertEquals(1, dynProperties.size());
+		Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0);
+		Assert.assertEquals("5 min", dynProperties.get(0).f1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
new file mode 100644
index 0000000..6671eb4
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+
+/**
+ * Yarn application master which starts the {@link TestingYarnJobManager} and the
+ * {@link TestingMemoryArchivist}.
+ */
+public class TestingApplicationMaster extends ApplicationMasterBase {
+	@Override
+	public Class<? extends JobManager> getJobManagerClass() {
+		return TestingYarnJobManager.class;
+	}
+
+	@Override
+	public Class<? extends MemoryArchivist> getArchivistClass() {
+		return TestingMemoryArchivist.class;
+	}
+
+	public static void main(String[] args) {
+		TestingApplicationMaster applicationMaster = new TestingApplicationMaster();
+
+		applicationMaster.run(args);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
new file mode 100644
index 0000000..1efc336
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing classes.
+ */
+public class TestingFlinkYarnClient extends FlinkYarnClientBase {
+
+	public TestingFlinkYarnClient() {
+		List<File> filesToShip = new ArrayList<>();
+
+		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+		Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
+			"Make sure to package the flink-yarn-tests module.");
+
+		File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+		Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
+			"jar. Make sure to package the flink-runtime module.");
+
+		filesToShip.add(testingJar);
+		filesToShip.add(testingRuntimeJar);
+
+		setShipFiles(filesToShip);
+	}
+
+	@Override
+	protected Class<?> getApplicationMasterClass() {
+		return TestingApplicationMaster.class;
+	}
+
+	public static class TestJarFinder implements FilenameFilter {
+
+		private final String jarName;
+
+		public TestJarFinder(final String jarName) {
+			this.jarName = jarName;
+		}
+
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+				dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
new file mode 100644
index 0000000..8586a77
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import java.io.IOException;
+
+/**
+ * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
+ */
+public class TestingYarnTaskManagerRunner {
+	public static void main(String[] args) throws IOException {
+		YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 1e9e4fa..5c709b0 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.yarn;
 
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.log4j.AppenderSkeleton;
 import org.apache.log4j.Level;
 import org.apache.log4j.spi.LoggingEvent;
@@ -35,7 +37,7 @@ public class UtilsTest {
 
 	@Test
 	public void testUberjarLocator() {
-		File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
+		File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
 		Assert.assertNotNull(dir);
 		Assert.assertTrue(dir.getName().endsWith(".jar"));
 		dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
@@ -47,6 +49,54 @@ public class UtilsTest {
 		Assert.assertTrue(files.contains("conf"));
 	}
 
+	/**
+	 * Remove 15% of the heap, at least 384MB.
+	 *
+	 */
+	@Test
+	public void testHeapCutoff() {
+		Configuration conf = new Configuration();
+		conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+		conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+		Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+		Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
+
+		// test different configuration
+		Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
+
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
+		Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
+
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
+		Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
+
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgument() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void illegalArgumentNegative() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void tooMuchCutoff() {
+		Configuration conf = new Configuration();
+		conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
+		Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+	}
+
 
 	//
 	// --------------- Tools to test if a certain string has been logged with Log4j. -------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
new file mode 100644
index 0000000..94d0a81
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.testkit.JavaTestKit;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+public class YARNHighAvailabilityITCase extends YarnTestBase {
+
+	private static TestingServer zkServer;
+
+	private static ActorSystem actorSystem;
+
+	private static final int numberApplicationAttempts = 10;
+
+	@BeforeClass
+	public static void setup() {
+		actorSystem = AkkaUtils.createDefaultActorSystem();
+
+		try {
+			zkServer = new TestingServer();
+			zkServer.start();
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Could not start ZooKeeper testing cluster.");
+		}
+
+		yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+		yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+
+		startYARNWithConfig(yarnConfiguration);
+	}
+
+	@AfterClass
+	public static void teardown() throws IOException {
+		if(zkServer != null) {
+			zkServer.stop();
+		}
+
+		JavaTestKit.shutdownActorSystem(actorSystem);
+		actorSystem = null;
+	}
+
+	/**
+	 * Tests that the application master can be killed multiple times and that the surviving
+	 * TaskManager succesfully reconnects to the newly started JobManager.
+	 * @throws Exception
+	 */
+	@Test
+	public void testMultipleAMKill() throws Exception {
+		final int numberKillingAttempts = numberApplicationAttempts - 1;
+
+		TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient();
+
+		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
+		flinkYarnClient.setTaskManagerCount(1);
+		flinkYarnClient.setJobManagerMemory(768);
+		flinkYarnClient.setTaskManagerMemory(1024);
+		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+
+		String confDirPath = System.getenv("FLINK_CONF_DIR");
+		flinkYarnClient.setConfigurationDirectory(confDirPath);
+
+		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
+			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts);
+		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+		AbstractFlinkYarnCluster yarnCluster = null;
+
+		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+		try {
+			yarnCluster = flinkYarnClient.deploy();
+			yarnCluster.connectToCluster();
+			final Configuration config = yarnCluster.getFlinkConfiguration();
+
+			new JavaTestKit(actorSystem) {{
+				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
+					new Within(timeout) {
+						@Override
+						protected void run() {
+							try {
+								LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+
+								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+								expectMsgEquals(Messages.getAcknowledge());
+
+								gateway.tell(PoisonPill.getInstance());
+							} catch (Exception e) {
+								throw new AssertionError("Could not complete test.", e);
+							}
+						}
+					};
+				}
+
+				new Within(timeout) {
+					@Override
+					protected void run() {
+						try {
+							LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+							ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
+							gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+							expectMsgEquals(Messages.getAcknowledge());
+						} catch (Exception e) {
+							throw new AssertionError("Could not complete test.", e);
+						}
+					}
+				};
+
+			}};
+		} finally {
+			if (yarnCluster != null) {
+				yarnCluster.shutdown(false);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 9d72c90..c2e9a45 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index b7d0b33..5ce528e 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -356,9 +356,12 @@ public abstract class YarnTestBase extends TestLogger {
 			}
 
 			Map<String, String> map = new HashMap<String, String>(System.getenv());
-			File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"}));
-			Assert.assertNotNull(flinkConfFilePath);
-			map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
+
+			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
+			Assert.assertNotNull(flinkConfDirPath);
+
+			map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
+
 			File yarnConfFile = writeYarnSiteConfigXML(conf);
 			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
 			map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
@@ -580,11 +583,7 @@ public abstract class YarnTestBase extends TestLogger {
 	// -------------------------- Tear down -------------------------- //
 
 	@AfterClass
-	public static void tearDown() {
-		/*
-			We don't shut down the MiniCluster, as it is prone to blocking infinitely.
-		*/
-		
+	public static void copyOnTravis() {
 		// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
 		// to <flinkRoot>/target/flink-yarn-tests-*.
 		// The files from there are picked up by the ./tools/travis_watchdog.sh script

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties
index dc02575..ebe0d37 100644
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -27,5 +27,9 @@ log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x -
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
 
 # log whats going on between the tests
-log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
-log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console
+log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
+log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
+log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
+log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
+log4j.logger.org.apache.flink.runtime.leaderelection=INFO
+log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
new file mode 100644
index 0000000..83d1f3c
--- /dev/null
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+  *
+  * @param flinkConfiguration Configuration object for the actor
+  * @param executionContext Execution context which is used to execute concurrent tasks in the
+  *                         [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+  * @param instanceManager Instance manager to manage the registered
+  *                        [[org.apache.flink.runtime.taskmanager.TaskManager]]
+  * @param scheduler Scheduler to schedule Flink jobs
+  * @param libraryCacheManager Manager to manage uploaded jar files
+  * @param archive Archive for finished Flink jobs
+  * @param defaultExecutionRetries Number of default execution retries
+  * @param delayBetweenRetries Delay between retries
+  * @param timeout Timeout for futures
+  * @param mode StreamingMode in which the system shall be started
+  * @param leaderElectionService LeaderElectionService to participate in the leader election
+  */
+class TestingYarnJobManager(
+    flinkConfiguration: Configuration,
+    executionContext: ExecutionContext,
+    instanceManager: InstanceManager,
+    scheduler: Scheduler,
+    libraryCacheManager: BlobLibraryCacheManager,
+    archive: ActorRef,
+    defaultExecutionRetries: Int,
+    delayBetweenRetries: Long,
+    timeout: FiniteDuration,
+    mode: StreamingMode,
+    leaderElectionService: LeaderElectionService)
+  extends YarnJobManager(
+    flinkConfiguration,
+    executionContext,
+    instanceManager,
+    scheduler,
+    libraryCacheManager,
+    archive,
+    defaultExecutionRetries,
+    delayBetweenRetries,
+    timeout,
+    mode,
+    leaderElectionService)
+  with TestingJobManagerLike {
+
+  override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
new file mode 100644
index 0000000..11c7f88
--- /dev/null
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
+
+/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
+  *
+  * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+  * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+  *
+  * @param config Configuration object for the actor
+  * @param connectionInfo Connection information of this actor
+  * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
+  * @param ioManager IOManager responsible for I/O
+  * @param network NetworkEnvironment for this actor
+  * @param numberOfSlots Number of slots for this TaskManager
+  * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
+  *                              JobManager
+  */
+class TestingYarnTaskManager(
+    config: TaskManagerConfiguration,
+    connectionInfo: InstanceConnectionInfo,
+    memoryManager: MemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int,
+    leaderRetrievalService: LeaderRetrievalService)
+  extends YarnTaskManager(
+    config,
+    connectionInfo,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots,
+    leaderRetrievalService)
+  with TestingTaskManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 44eca0d..4225e68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -17,746 +17,12 @@
  */
 package org.apache.flink.yarn;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-
 /**
- * All classes in this package contain code taken from
- * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
- * and
- * https://github.com/hortonworks/simple-yarn-app
- * and
- * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- *
- * The Flink jar is uploaded to HDFS by this client.
- * The application master and all the TaskManager containers get the jar file downloaded
- * by YARN into their local fs.
- *
+ * Default implementation of {@link FlinkYarnClientBase} which starts an {@link ApplicationMaster}.
  */
-public class FlinkYarnClient extends AbstractFlinkYarnClient {
-	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
-
-	/**
-	 * Constants,
-	 * all starting with ENV_ are used as environment variables to pass values from the Client
-	 * to the Application Master.
-	 */
-	public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
-	public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
-	public final static String ENV_APP_ID = "_APP_ID";
-	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
-	public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
-	public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
-	public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
-	public static final String ENV_SLOTS = "_SLOTS";
-	public static final String ENV_DETACHED = "_DETACHED";
-	public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
-	public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
-
-
-	/**
-	 * Minimum memory requirements, checked by the Client.
-	 */
-	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
-	private static final int MIN_TM_MEMORY = 768;
-
-	private Configuration conf;
-	private YarnClient yarnClient;
-	private YarnClientApplication yarnApplication;
-
-
-	/**
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
-
-	/**
-	 * If the user has specified a different number of slots, we store them here
-	 */
-	private int slots = -1;
-
-	private int jobManagerMemoryMb = 1024;
-
-	private int taskManagerMemoryMb = 1024;
-
-	private int taskManagerCount = 1;
-
-	private String yarnQueue = null;
-
-	private String configurationDirectory;
-
-	private Path flinkConfigurationPath;
-
-	private Path flinkLoggingConfigurationPath; // optional
-
-	private Path flinkJarPath;
-
-	private String dynamicPropertiesEncoded;
-
-	private List<File> shipFiles = new ArrayList<File>();
-	private org.apache.flink.configuration.Configuration flinkConfiguration;
-
-	private boolean detached;
-	private boolean streamingMode;
-
-	private String customName = null;
-
-	public FlinkYarnClient() {
-		conf = new YarnConfiguration();
-		if(this.yarnClient == null) {
-			// Create yarnClient
-			yarnClient = YarnClient.createYarnClient();
-			yarnClient.init(conf);
-			yarnClient.start();
-		}
-
-		// for unit tests only
-		if(System.getenv("IN_TESTS") != null) {
-			try {
-				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
-			} catch (Throwable t) {
-				throw new RuntimeException("Error",t);
-			}
-		}
-	}
-
-	@Override
-	public void setJobManagerMemory(int memoryMb) {
-		if(memoryMb < MIN_JM_MEMORY) {
-			throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
-					+ "of " + MIN_JM_MEMORY+ " MB");
-		}
-		this.jobManagerMemoryMb = memoryMb;
-	}
-
-	@Override
-	public void setTaskManagerMemory(int memoryMb) {
-		if(memoryMb < MIN_TM_MEMORY) {
-			throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
-					+ "of " + MIN_TM_MEMORY+ " MB");
-		}
-		this.taskManagerMemoryMb = memoryMb;
-	}
-
-	@Override
-	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
-		this.flinkConfiguration = conf;
-	}
-
-	@Override
-	public void setTaskManagerSlots(int slots) {
-		if(slots <= 0) {
-			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
-		}
-		this.slots = slots;
-	}
-
-	@Override
-	public int getTaskManagerSlots() {
-		return this.slots;
-	}
-
+public class FlinkYarnClient extends FlinkYarnClientBase {
 	@Override
-	public void setQueue(String queue) {
-		this.yarnQueue = queue;
+	protected Class<?> getApplicationMasterClass() {
+		return ApplicationMaster.class;
 	}
-
-	@Override
-	public void setLocalJarPath(Path localJarPath) {
-		if(!localJarPath.toString().endsWith("jar")) {
-			throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
-		}
-		this.flinkJarPath = localJarPath;
-	}
-
-	@Override
-	public void setConfigurationFilePath(Path confPath) {
-		flinkConfigurationPath = confPath;
-	}
-
-	public void setConfigurationDirectory(String configurationDirectory) {
-		this.configurationDirectory = configurationDirectory;
-	}
-
-	@Override
-	public void setFlinkLoggingConfigurationPath(Path logConfPath) {
-		flinkLoggingConfigurationPath = logConfPath;
-	}
-
-	@Override
-	public Path getFlinkLoggingConfigurationPath() {
-		return flinkLoggingConfigurationPath;
-	}
-
-	@Override
-	public void setTaskManagerCount(int tmCount) {
-		if(tmCount < 1) {
-			throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
-		}
-		this.taskManagerCount = tmCount;
-	}
-
-	@Override
-	public int getTaskManagerCount() {
-		return this.taskManagerCount;
-	}
-
-	@Override
-	public void setShipFiles(List<File> shipFiles) {
-		File shipFile;
-		for (File shipFile1 : shipFiles) {
-			shipFile = shipFile1;
-			// remove uberjar from ship list (by default everything in the lib/ folder is added to
-			// the list of files to ship, but we handle the uberjar separately.
-			if (!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
-				this.shipFiles.add(shipFile);
-			}
-		}
-	}
-
-	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
-		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
-	}
-
-	@Override
-	public String getDynamicPropertiesEncoded() {
-		return this.dynamicPropertiesEncoded;
-	}
-
-
-	public void isReadyForDepoyment() throws YarnDeploymentException {
-		if(taskManagerCount <= 0) {
-			throw new YarnDeploymentException("Taskmanager count must be positive");
-		}
-		if(this.flinkJarPath == null) {
-			throw new YarnDeploymentException("The Flink jar path is null");
-		}
-		if(this.configurationDirectory == null) {
-			throw new YarnDeploymentException("Configuration directory not set");
-		}
-		if(this.flinkConfigurationPath == null) {
-			throw new YarnDeploymentException("Configuration path not set");
-		}
-		if(this.flinkConfiguration == null) {
-			throw new YarnDeploymentException("Flink configuration object has not been set");
-		}
-
-		// check if required Hadoop environment variables are set. If not, warn user
-		if(System.getenv("HADOOP_CONF_DIR") == null &&
-				System.getenv("YARN_CONF_DIR") == null) {
-			LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
-					"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
-					"configuration for accessing YARN.");
-		}
-	}
-
-	public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
-		for(int i = 0; i < nodeManagers.length; i++) {
-			if(nodeManagers[i] >= toAllocate) {
-				nodeManagers[i] -= toAllocate;
-				return true;
-			}
-		}
-		return false;
-	}
-
-	@Override
-	public void setDetachedMode(boolean detachedMode) {
-		this.detached = detachedMode;
-	}
-
-	@Override
-	public boolean isDetached() {
-		return detached;
-	}
-
-	public AbstractFlinkYarnCluster deploy() throws Exception {
-
-		UserGroupInformation.setConfiguration(conf);
-		UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
-		if (UserGroupInformation.isSecurityEnabled()) {
-			if (!ugi.hasKerberosCredentials()) {
-				throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
-						"You may use kinit to authenticate and request a TGT from the Kerberos server.");
-			}
-			return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
-				@Override
-				public AbstractFlinkYarnCluster run() throws Exception {
-					return deployInternal();
-				}
-			});
-		} else {
-			return deployInternal();
-		}
-	}
-
-
-
-	/**
-	 * This method will block until the ApplicationMaster/JobManager have been
-	 * deployed on YARN.
-	 */
-	protected AbstractFlinkYarnCluster deployInternal() throws Exception {
-		isReadyForDepoyment();
-
-		LOG.info("Using values:");
-		LOG.info("\tTaskManager count = {}", taskManagerCount);
-		LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
-		LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
-
-		// Create application via yarnClient
-		yarnApplication = yarnClient.createApplication();
-		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
-		// ------------------ Add dynamic properties to local flinkConfiguraton ------
-
-		List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
-		for (Tuple2<String, String> dynProperty : dynProperties) {
-			flinkConfiguration.setString(dynProperty.f0, dynProperty.f1);
-		}
-
-		// ------------------ Check if the specified queue exists --------------
-
-		try {
-			List<QueueInfo> queues = yarnClient.getAllQueues();
-			if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
-				boolean queueFound = false;
-				for (QueueInfo queue : queues) {
-					if (queue.getQueueName().equals(this.yarnQueue)) {
-						queueFound = true;
-						break;
-					}
-				}
-				if (!queueFound) {
-					String queueNames = "";
-					for (QueueInfo queue : queues) {
-						queueNames += queue.getQueueName() + ", ";
-					}
-					LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
-							"Available queues: " + queueNames);
-				}
-			} else {
-				LOG.debug("The YARN cluster does not have any queues configured");
-			}
-		} catch(Throwable e) {
-			LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("Error details", e);
-			}
-		}
-
-		// ------------------ Check if the YARN Cluster has the requested resources --------------
-
-		// the yarnMinAllocationMB specifies the smallest possible container allocation size.
-		// all allocations below this value are automatically set to this value.
-		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
-		if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
-			LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
-					+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
-					"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
-					"you requested will start.");
-		}
-
-		// set the memory to minAllocationMB to do the next checks correctly
-		if(jobManagerMemoryMb < yarnMinAllocationMB) {
-			jobManagerMemoryMb =  yarnMinAllocationMB;
-		}
-		if(taskManagerMemoryMb < yarnMinAllocationMB) {
-			taskManagerMemoryMb =  yarnMinAllocationMB;
-		}
-
-		Resource maxRes = appResponse.getMaximumResourceCapability();
-		final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
-		if(jobManagerMemoryMb > maxRes.getMemory() ) {
-			failSessionDuringDeployment();
-			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
-					+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
-		}
-
-		if(taskManagerMemoryMb > maxRes.getMemory() ) {
-			failSessionDuringDeployment();
-			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
-					+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
-		}
-
-		final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
-				"connecting from the beginning because the resources are currently not available in the cluster. " +
-				"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
-				"the resources become available.";
-		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
-		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
-			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
-					+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
-
-		}
-		if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
-			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
-					+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
-		}
-		if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
-			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
-					+ "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
-		}
-
-		// ----------------- check if the requested containers fit into the cluster.
-
-		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
-		// first, allocate the jobManager somewhere.
-		if(!allocateResource(nmFree, jobManagerMemoryMb)) {
-			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
-					"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
-					Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
-		}
-		// allocate TaskManagers
-		for(int i = 0; i < taskManagerCount; i++) {
-			if(!allocateResource(nmFree, taskManagerMemoryMb)) {
-				LOG.warn("There is not enough memory available in the YARN cluster. " +
-						"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
-						"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
-						"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
-						"the following NodeManagers are available: " + Arrays.toString(nmFree)  + NOTE_RSC );
-			}
-		}
-
-		// ------------------ Prepare Application Master Container  ------------------------------
-
-		// respect custom JVM options in the YAML file
-		final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
-		String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
-		boolean hasLogback = new File(logbackFile).exists();
-		String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-
-		boolean hasLog4j = new File(log4jFile).exists();
-		if(hasLogback) {
-			shipFiles.add(new File(logbackFile));
-		}
-		if(hasLog4j) {
-			shipFiles.add(new File(log4jFile));
-		}
-
-		// Set up the container launch context for the application master
-		ContainerLaunchContext amContainer = Records
-				.newRecord(ContainerLaunchContext.class);
-
-		String amCommand = "$JAVA_HOME/bin/java"
-					+ " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
-
-		if(hasLogback || hasLog4j) {
-			amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
-		}
-
-		if(hasLogback) {
-			amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
-		}
-		if(hasLog4j) {
-			amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-		}
-
-		amCommand 	+= " " + ApplicationMaster.class.getName() + " "
-					+ " 1>"
-					+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
-					+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
-		amContainer.setCommands(Collections.singletonList(amCommand));
-
-		LOG.debug("Application Master start command: " + amCommand);
-
-		// intialize HDFS
-		// Copy the application master jar to the filesystem
-		// Create a local resource to point to the destination jar path
-		final FileSystem fs = FileSystem.get(conf);
-
-		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
-		if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
-				fs.getScheme().startsWith("file")) {
-			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
-					+ "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
-					+ "The Flink YARN client needs to store its files in a distributed file system");
-		}
-
-		// Set-up ApplicationSubmissionContext for the application
-		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
-		appContext.setMaxAppAttempts(flinkConfiguration.getInteger(ConfigConstants.YARN_APPLICATION_ATTEMPTS, 1));
-
-		final ApplicationId appId = appContext.getApplicationId();
-
-		// Setup jar for ApplicationMaster
-		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
-		LocalResource flinkConf = Records.newRecord(LocalResource.class);
-		Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
-		Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
-		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
-		localResources.put("flink.jar", appMasterJar);
-		localResources.put("flink-conf.yaml", flinkConf);
-
-
-		// setup security tokens (code from apache storm)
-		final Path[] paths = new Path[2 + shipFiles.size()];
-		StringBuilder envShipFileList = new StringBuilder();
-		// upload ship files
-		for (int i = 0; i < shipFiles.size(); i++) {
-			File shipFile = shipFiles.get(i);
-			LocalResource shipResources = Records.newRecord(LocalResource.class);
-			Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
-			paths[2 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
-					shipLocalPath, shipResources, fs.getHomeDirectory());
-			localResources.put(shipFile.getName(), shipResources);
-
-			envShipFileList.append(paths[2 + i]);
-			if(i+1 < shipFiles.size()) {
-				envShipFileList.append(',');
-			}
-		}
-
-		paths[0] = remotePathJar;
-		paths[1] = remotePathConf;
-		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
-
-		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-		fs.setPermission(sessionFilesDir, permission); // set permission for path.
-
-		Utils.setTokensFor(amContainer, paths, conf);
-
-		amContainer.setLocalResources(localResources);
-		fs.close();
-
-		// Setup CLASSPATH for ApplicationMaster
-		Map<String, String> appMasterEnv = new HashMap<String, String>();
-		Utils.setupEnv(conf, appMasterEnv);
-		// set configuration values
-		appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
-		appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
-		appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
-		appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
-		appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
-		appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
-		appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached));
-		appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode));
-
-		if(dynamicPropertiesEncoded != null) {
-			appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
-		}
-
-		amContainer.setEnvironment(appMasterEnv);
-
-		// Set up resource type requirements for ApplicationMaster
-		Resource capability = Records.newRecord(Resource.class);
-		capability.setMemory(jobManagerMemoryMb);
-		capability.setVirtualCores(1);
-
-		String name;
-		if(customName == null) {
-			name = "Flink session with " + taskManagerCount + " TaskManagers";
-			if(detached) {
-				name += " (detached)";
-			}
-		} else {
-			name = customName;
-		}
-
-		appContext.setApplicationName(name); // application name
-		appContext.setApplicationType("Apache Flink");
-		appContext.setAMContainerSpec(amContainer);
-		appContext.setResource(capability);
-		if(yarnQueue != null) {
-			appContext.setQueue(yarnQueue);
-		}
-
-		LOG.info("Submitting application master " + appId);
-		yarnClient.submitApplication(appContext);
-
-		LOG.info("Waiting for the cluster to be allocated");
-		int waittime = 0;
-		loop: while( true ) {
-			ApplicationReport report = yarnClient.getApplicationReport(appId);
-			YarnApplicationState appState = report.getYarnApplicationState();
-			switch(appState) {
-				case FAILED:
-				case FINISHED:
-				case KILLED:
-					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
-							+ appState + " during deployment. \n" +
-							"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
-							"If log aggregation is enabled on your cluster, use this command to further invesitage the issue:\n" +
-							"yarn logs -applicationId " + appId);
-					//break ..
-				case RUNNING:
-					LOG.info("YARN application has been deployed successfully.");
-					break loop;
-				default:
-					LOG.info("Deploying cluster, current state " + appState);
-					if(waittime > 60000) {
-						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
-					}
-
-			}
-			waittime += 1000;
-			Thread.sleep(1000);
-		}
-		// the Flink cluster is deployed in YARN. Represent cluster
-		return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached);
-	}
-
-	/**
-	 * Kills YARN application and stops YARN client.
-	 *
-	 * Use this method to kill the App before it has been properly deployed
-	 */
-	private void failSessionDuringDeployment() {
-		LOG.info("Killing YARN application");
-
-		try {
-			yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
-		} catch (Exception e) {
-			// we only log a debug message here because the "killApplication" call is a best-effort
-			// call (we don't know if the application has been deployed when the error occured).
-			LOG.debug("Error while killing YARN application", e);
-		}
-		yarnClient.stop();
-	}
-
-
-	private static class ClusterResourceDescription {
-		final public int totalFreeMemory;
-		final public int containerLimit;
-		final public int[] nodeManagersFree;
-
-		public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
-			this.totalFreeMemory = totalFreeMemory;
-			this.containerLimit = containerLimit;
-			this.nodeManagersFree = nodeManagersFree;
-		}
-	}
-
-	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-
-		int totalFreeMemory = 0;
-		int containerLimit = 0;
-		int[] nodeManagersFree = new int[nodes.size()];
-
-		for(int i = 0; i < nodes.size(); i++) {
-			NodeReport rep = nodes.get(i);
-			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
-			nodeManagersFree[i] = free;
-			totalFreeMemory += free;
-			if(free > containerLimit) {
-				containerLimit = free;
-			}
-		}
-		return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
-	}
-
-	public String getClusterDescription() throws Exception {
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		PrintStream ps = new PrintStream(baos);
-
-		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-
-		ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
-		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-		final String format = "|%-16s |%-16s %n";
-		ps.printf("|Property         |Value          %n");
-		ps.println("+---------------------------------------+");
-		int totalMemory = 0;
-		int totalCores = 0;
-		for(NodeReport rep : nodes) {
-			final Resource res = rep.getCapability();
-			totalMemory += res.getMemory();
-			totalCores += res.getVirtualCores();
-			ps.format(format, "NodeID", rep.getNodeId());
-			ps.format(format, "Memory", res.getMemory() + " MB");
-			ps.format(format, "vCores", res.getVirtualCores());
-			ps.format(format, "HealthReport", rep.getHealthReport());
-			ps.format(format, "Containers", rep.getNumContainers());
-			ps.println("+---------------------------------------+");
-		}
-		ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
-		List<QueueInfo> qInfo = yarnClient.getAllQueues();
-		for(QueueInfo q : qInfo) {
-			ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
-					q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
-		}
-		yarnClient.stop();
-		return baos.toString();
-	}
-
-	public String getSessionFilesDir() {
-		return sessionFilesDir.toString();
-	}
-
-	@Override
-	public void setStreamingMode(boolean streamingMode) {
-		this.streamingMode = streamingMode;
-	}
-
-	@Override
-	public void setName(String name) {
-		if(name == null) {
-			throw new IllegalArgumentException("The passed name is null");
-		}
-		customName = name;
-	}
-
-	public static class YarnDeploymentException extends RuntimeException {
-		public YarnDeploymentException() {
-		}
-
-		public YarnDeploymentException(String message) {
-			super(message);
-		}
-
-		public YarnDeploymentException(String message, Throwable cause) {
-			super(message, cause);
-		}
-	}
-
 }