You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:31 UTC

[10/14] flink git commit: [FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor

[FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor

Let the Flip6YarnClusterDescriptor create a RestClusterClient as ClusterClient.
Moreover, this commit makes the YarnResourceManager register under the REST port
at Yarn.

This closes #5234.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbe0e828
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbe0e828
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbe0e828

Branch: refs/heads/master
Commit: dbe0e8286d76a5facdb49589b638b87dbde80178
Parents: 7d986ce
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 3 20:38:21 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:05 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 12 ++--
 .../yarn/TestingYarnClusterDescriptor.java      |  2 +-
 .../java/org/apache/flink/yarn/YARNITCase.java  | 10 +--
 .../yarn/AbstractYarnClusterDescriptor.java     | 37 ++++++-----
 .../flink/yarn/Flip6YarnClusterDescriptor.java  | 66 ++++++++++++++++++++
 .../flink/yarn/YarnClusterDescriptor.java       | 14 +++++
 .../flink/yarn/YarnClusterDescriptorV2.java     | 49 ---------------
 .../apache/flink/yarn/YarnResourceManager.java  | 16 ++++-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 17 +++--
 .../yarn/entrypoint/YarnEntrypointUtils.java    |  7 +++
 .../flink/yarn/AbstractYarnClusterTest.java     |  6 ++
 11 files changed, 150 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 3e8b136..4d8656b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -318,28 +318,24 @@ public class RestClusterClient<T> extends ClusterClient<T> {
 		return false;
 	}
 
-	// ======================================
-	// Legacy stuff we ignore
-	// ======================================
-
 	@Override
 	public void waitForClusterToBeReady() {
-		throw new UnsupportedOperationException();
+		// no op
 	}
 
 	@Override
 	public String getWebInterfaceURL() {
-		throw new UnsupportedOperationException();
+		return "http://" + restClusterClientConfiguration.getRestServerAddress() + ':' + restClusterClientConfiguration.getRestServerPort();
 	}
 
 	@Override
 	public GetClusterStatusResponse getClusterStatus() {
-		throw new UnsupportedOperationException();
+		return null;
 	}
 
 	@Override
 	public List<String> getNewMessages() {
-		throw new UnsupportedOperationException();
+		return Collections.emptyList();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index e66d2e0..95a31be 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -35,7 +35,7 @@ import java.util.List;
  * flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which
  * are shipped to the yarn cluster. This is necessary to load the testing classes.
  */
-public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 
 	public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
 		super(

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 069f68a..bea9001 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -55,13 +57,13 @@ public class YARNITCase extends YarnTestBase {
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
 		final YarnClient yarnClient = YarnClient.createYarnClient();
 
-		try (final YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(
+		try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
 			configuration,
 			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
 			yarnClient)) {
 
-			yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-			yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+			flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
 			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
 				.setMasterMemoryMB(768)
@@ -83,7 +85,7 @@ public class YARNITCase extends YarnTestBase {
 
 			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
 
-			YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+			ClusterClient<ApplicationId> clusterClient = flip6YarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph);
 
 			clusterClient.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a7e8c36..f015235 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterRetrieveException;
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -332,7 +334,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	// -------------------------------------------------------------
 
 	@Override
-	public YarnClusterClient retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
+	public ClusterClient<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
 
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
@@ -352,11 +354,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
 			}
 
+			final String host = appReport.getHost();
+			final int rpcPort = appReport.getRpcPort();
+
 			LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
-				appReport.getHost(), appReport.getRpcPort(), applicationId);
+				host, rpcPort, applicationId);
+
+			flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+			flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
 
-			flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
-			flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
+			flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
+			flinkConfiguration.setInteger(RestOptions.REST_PORT, rpcPort);
 
 			return createYarnClusterClient(
 				this,
@@ -371,7 +379,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+	public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
 		try {
 			return deployInternal(
 				clusterSpecification,
@@ -383,7 +391,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
+	public ClusterClient<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
 		try {
 			return deployInternal(
 				clusterSpecification,
@@ -410,7 +418,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * @param clusterSpecification Initial cluster specification for the to be deployed Flink cluster
 	 * @param jobGraph A job graph which is deployed with the Flink cluster, null if none
 	 */
-	protected YarnClusterClient deployInternal(
+	protected ClusterClient<ApplicationId> deployInternal(
 			ClusterSpecification clusterSpecification,
 			String yarnClusterEntrypoint,
 			@Nullable JobGraph jobGraph) throws Exception {
@@ -488,6 +496,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
 		flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
 
+		flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
+		flinkConfiguration.setInteger(RestOptions.REST_PORT, port);
+
 		// the Flink cluster is deployed in YARN. Represent cluster
 		return createYarnClusterClient(
 			this,
@@ -1534,20 +1545,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/**
 	 * Creates a YarnClusterClient; may be overriden in tests.
 	 */
-	protected YarnClusterClient createYarnClusterClient(
+	protected abstract ClusterClient<ApplicationId> createYarnClusterClient(
 			AbstractYarnClusterDescriptor descriptor,
 			int numberTaskManagers,
 			int slotsPerTaskManager,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
-			boolean perJobCluster) throws Exception {
-		return new YarnClusterClient(
-			descriptor,
-			numberTaskManagers,
-			slotsPerTaskManager,
-			report,
-			flinkConfiguration,
-			perJobCluster);
-	}
+			boolean perJobCluster) throws Exception;
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
new file mode 100644
index 0000000..e3f9646
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+/**
+ * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
+ * new application master for a job under flip-6.
+ */
+public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
+	public Flip6YarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnCLient) {
+		super(flinkConfiguration, configurationDirectory, yarnCLient);
+	}
+
+	@Override
+	protected String getYarnSessionClusterEntrypoint() {
+		return YarnSessionClusterEntrypoint.class.getName();
+	}
+
+	@Override
+	protected String getYarnJobClusterEntrypoint() {
+		return YarnJobClusterEntrypoint.class.getName();
+	}
+
+	@Override
+	protected ClusterClient<ApplicationId> createYarnClusterClient(
+			AbstractYarnClusterDescriptor descriptor,
+			int numberTaskManagers,
+			int slotsPerTaskManager,
+			ApplicationReport report,
+			Configuration flinkConfiguration,
+			boolean perJobCluster) throws Exception {
+		return new RestClusterClient<>(
+			flinkConfiguration,
+			report.getApplicationId());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 76f9154..9467326 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -19,9 +19,12 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 
 /**
@@ -50,4 +53,15 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 	public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
 	}
+
+	@Override
+	protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
+		return new YarnClusterClient(
+			descriptor,
+			numberTaskManagers,
+			slotsPerTaskManager,
+			report,
+			flinkConfiguration,
+			perJobCluster);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
deleted file mode 100644
index 6ce192c..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
-import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
-
-import org.apache.hadoop.yarn.client.api.YarnClient;
-
-/**
- * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
- * new application master for a job under flip-6.
- */
-public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
-
-	public YarnClusterDescriptorV2(
-			Configuration flinkConfiguration,
-			String configurationDirectory,
-			YarnClient yarnCLient) {
-		super(flinkConfiguration, configurationDirectory, yarnCLient);
-	}
-
-	@Override
-	protected String getYarnSessionClusterEntrypoint() {
-		return YarnSessionClusterEntrypoint.class.getName();
-	}
-
-	@Override
-	protected String getYarnJobClusterEntrypoint() {
-		return YarnJobClusterEntrypoint.class.getName();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 5a71f41..0fa0dda 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -177,7 +177,21 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
 		Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
 
-		resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, webInterfaceUrl);
+		final int restPort;
+
+		if (webInterfaceUrl != null) {
+			final int lastColon = webInterfaceUrl.lastIndexOf(':');
+
+			if (lastColon == -1) {
+				restPort = -1;
+			} else {
+				restPort = Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
+			}
+		} else {
+			restPort = -1;
+		}
+
+		resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
 
 		return resourceManagerClient;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fea8a5c..d4cf9ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -38,8 +38,8 @@ import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
+import org.apache.flink.yarn.Flip6YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
-import org.apache.flink.yarn.YarnClusterDescriptorV2;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
@@ -522,7 +522,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 		for (Option option : commandLine.getOptions()) {
 			if (allOptions.hasOption(option.getOpt())) {
-				if (!option.getOpt().equals(detached.getOpt())) {
+				if (!option.getOpt().equals(detached.getOpt()) && !option.getOpt().equals(flip6.getOpt())) {
 					// don't resume from properties file if yarn options have been specified
 					canApplyYarnProperties = false;
 					break;
@@ -653,14 +653,19 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 
 						deleteYarnPropertiesFile();
 
+						ApplicationReport applicationReport;
+
 						try {
-							final ApplicationReport applicationReport = yarnClusterDescriptor
+							applicationReport = yarnClusterDescriptor
 								.getYarnClient()
 								.getApplicationReport(yarnApplicationId);
-
-							logFinalApplicationReport(applicationReport);
 						} catch (YarnException | IOException e) {
 							LOG.info("Could not log the final application report.", e);
+							applicationReport = null;
+						}
+
+						if (applicationReport != null) {
+							logFinalApplicationReport(applicationReport);
 						}
 					}
 				}
@@ -950,7 +955,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 	private static AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
 		final YarnClient yarnClient = YarnClient.createYarnClient();
 		if (flip6) {
-			return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
+			return new Flip6YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
 		} else {
 			return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 256056c..a03ed08 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -93,6 +94,7 @@ public class YarnEntrypointUtils {
 			ApplicationConstants.Environment.NM_HOST.key());
 
 		configuration.setString(JobManagerOptions.ADDRESS, hostname);
+		configuration.setString(RestOptions.REST_ADDRESS, hostname);
 
 		// TODO: Support port ranges for the AM
 //		final String portRange = configuration.getString(
@@ -112,6 +114,11 @@ public class YarnEntrypointUtils {
 			configuration.setInteger(WebOptions.PORT, 0);
 		}
 
+		if (configuration.getInteger(RestOptions.REST_PORT) >= 0) {
+			// set the REST port to 0 to select it randomly
+			configuration.setInteger(RestOptions.REST_PORT, 0);
+		}
+
 		// if the user has set the deprecated YARN-specific config keys, we add the
 		// corresponding generic config keys instead. that way, later code needs not
 		// deal with deprecated config keys

http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
index 4e68612..0859f03 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.yarn;
 
 import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -131,5 +132,10 @@ public class AbstractYarnClusterTest extends TestLogger {
 		protected String getYarnJobClusterEntrypoint() {
 			throw new UnsupportedOperationException("Not needed for testing");
 		}
+
+		@Override
+		protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
+			throw new UnsupportedOperationException("Not needed for testing");
+		}
 	}
 }