You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:31 UTC
[10/14] flink git commit: [FLINK-8119] [flip6] Wire correct Flip6
components in Flip6YarnClusterDescriptor
[FLINK-8119] [flip6] Wire correct Flip6 components in Flip6YarnClusterDescriptor
Let the Flip6YarnClusterDescriptor create a RestClusterClient as ClusterClient.
Moreover, this commit makes the YarnResourceManager register under the REST port
at Yarn.
This closes #5234.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dbe0e828
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dbe0e828
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dbe0e828
Branch: refs/heads/master
Commit: dbe0e8286d76a5facdb49589b638b87dbde80178
Parents: 7d986ce
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 3 20:38:21 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:05 2018 +0100
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 12 ++--
.../yarn/TestingYarnClusterDescriptor.java | 2 +-
.../java/org/apache/flink/yarn/YARNITCase.java | 10 +--
.../yarn/AbstractYarnClusterDescriptor.java | 37 ++++++-----
.../flink/yarn/Flip6YarnClusterDescriptor.java | 66 ++++++++++++++++++++
.../flink/yarn/YarnClusterDescriptor.java | 14 +++++
.../flink/yarn/YarnClusterDescriptorV2.java | 49 ---------------
.../apache/flink/yarn/YarnResourceManager.java | 16 ++++-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 17 +++--
.../yarn/entrypoint/YarnEntrypointUtils.java | 7 +++
.../flink/yarn/AbstractYarnClusterTest.java | 6 ++
11 files changed, 150 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 3e8b136..4d8656b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -318,28 +318,24 @@ public class RestClusterClient<T> extends ClusterClient<T> {
return false;
}
- // ======================================
- // Legacy stuff we ignore
- // ======================================
-
@Override
public void waitForClusterToBeReady() {
- throw new UnsupportedOperationException();
+ // no op
}
@Override
public String getWebInterfaceURL() {
- throw new UnsupportedOperationException();
+ return "http://" + restClusterClientConfiguration.getRestServerAddress() + ':' + restClusterClientConfiguration.getRestServerPort();
}
@Override
public GetClusterStatusResponse getClusterStatus() {
- throw new UnsupportedOperationException();
+ return null;
}
@Override
public List<String> getNewMessages() {
- throw new UnsupportedOperationException();
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index e66d2e0..95a31be 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -35,7 +35,7 @@ import java.util.List;
* flink-yarn-tests-X-tests.jar and the flink-runtime-X-tests.jar to the set of files which
* are shipped to the yarn cluster. This is necessary to load the testing classes.
*/
-public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+public class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
super(
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 069f68a..bea9001 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -55,13 +57,13 @@ public class YARNITCase extends YarnTestBase {
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
final YarnClient yarnClient = YarnClient.createYarnClient();
- try (final YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(
+ try (final Flip6YarnClusterDescriptor flip6YarnClusterDescriptor = new Flip6YarnClusterDescriptor(
configuration,
System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
yarnClient)) {
- yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
- yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+ flip6YarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+ flip6YarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
@@ -83,7 +85,7 @@ public class YARNITCase extends YarnTestBase {
jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
- YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+ ClusterClient<ApplicationId> clusterClient = flip6YarnClusterDescriptor.deployJobCluster(clusterSpecification, jobGraph);
clusterClient.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a7e8c36..f015235 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -30,6 +31,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -332,7 +334,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// -------------------------------------------------------------
@Override
- public YarnClusterClient retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
+ public ClusterClient<ApplicationId> retrieve(ApplicationId applicationId) throws ClusterRetrieveException {
try {
// check if required Hadoop environment variables are set. If not, warn user
@@ -352,11 +354,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
throw new RuntimeException("The Yarn application " + applicationId + " doesn't run anymore.");
}
+ final String host = appReport.getHost();
+ final int rpcPort = appReport.getRpcPort();
+
LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'",
- appReport.getHost(), appReport.getRpcPort(), applicationId);
+ host, rpcPort, applicationId);
+
+ flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
+ flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
- flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
- flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
+ flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
+ flinkConfiguration.setInteger(RestOptions.REST_PORT, rpcPort);
return createYarnClusterClient(
this,
@@ -371,7 +379,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
@Override
- public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
+ public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
@@ -383,7 +391,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
@Override
- public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
+ public ClusterClient<ApplicationId> deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
@@ -410,7 +418,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
* @param clusterSpecification Initial cluster specification for the to be deployed Flink cluster
* @param jobGraph A job graph which is deployed with the Flink cluster, null if none
*/
- protected YarnClusterClient deployInternal(
+ protected ClusterClient<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph) throws Exception {
@@ -488,6 +496,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
+ flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
+ flinkConfiguration.setInteger(RestOptions.REST_PORT, port);
+
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
@@ -1534,20 +1545,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
/**
* Creates a YarnClusterClient; may be overriden in tests.
*/
- protected YarnClusterClient createYarnClusterClient(
+ protected abstract ClusterClient<ApplicationId> createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
ApplicationReport report,
org.apache.flink.configuration.Configuration flinkConfiguration,
- boolean perJobCluster) throws Exception {
- return new YarnClusterClient(
- descriptor,
- numberTaskManagers,
- slotsPerTaskManager,
- report,
- flinkConfiguration,
- perJobCluster);
- }
+ boolean perJobCluster) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
new file mode 100644
index 0000000..e3f9646
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Flip6YarnClusterDescriptor.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
+/**
+ * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
+ * new application master for a job under flip-6.
+ */
+public class Flip6YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
+ public Flip6YarnClusterDescriptor(
+ Configuration flinkConfiguration,
+ String configurationDirectory,
+ YarnClient yarnCLient) {
+ super(flinkConfiguration, configurationDirectory, yarnCLient);
+ }
+
+ @Override
+ protected String getYarnSessionClusterEntrypoint() {
+ return YarnSessionClusterEntrypoint.class.getName();
+ }
+
+ @Override
+ protected String getYarnJobClusterEntrypoint() {
+ return YarnJobClusterEntrypoint.class.getName();
+ }
+
+ @Override
+ protected ClusterClient<ApplicationId> createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ boolean perJobCluster) throws Exception {
+ return new RestClusterClient<>(
+ flinkConfiguration,
+ report.getApplicationId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 76f9154..9467326 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -19,9 +19,12 @@
package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
/**
@@ -50,4 +53,15 @@ public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
public YarnClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
}
+
+ @Override
+ protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
+ return new YarnClusterClient(
+ descriptor,
+ numberTaskManagers,
+ slotsPerTaskManager,
+ report,
+ flinkConfiguration,
+ perJobCluster);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
deleted file mode 100644
index 6ce192c..0000000
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.yarn;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
-import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
-
-import org.apache.hadoop.yarn.client.api.YarnClient;
-
-/**
- * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
- * new application master for a job under flip-6.
- */
-public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
-
- public YarnClusterDescriptorV2(
- Configuration flinkConfiguration,
- String configurationDirectory,
- YarnClient yarnCLient) {
- super(flinkConfiguration, configurationDirectory, yarnCLient);
- }
-
- @Override
- protected String getYarnSessionClusterEntrypoint() {
- return YarnSessionClusterEntrypoint.class.getName();
- }
-
- @Override
- protected String getYarnJobClusterEntrypoint() {
- return YarnJobClusterEntrypoint.class.getName();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 5a71f41..0fa0dda 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -177,7 +177,21 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
//TODO: change akka address to tcp host and port, the getAddress() interface should return a standard tcp address
Tuple2<String, Integer> hostPort = parseHostPort(getAddress());
- resourceManagerClient.registerApplicationMaster(hostPort.f0, hostPort.f1, webInterfaceUrl);
+ final int restPort;
+
+ if (webInterfaceUrl != null) {
+ final int lastColon = webInterfaceUrl.lastIndexOf(':');
+
+ if (lastColon == -1) {
+ restPort = -1;
+ } else {
+ restPort = Integer.valueOf(webInterfaceUrl.substring(lastColon + 1));
+ }
+ } else {
+ restPort = -1;
+ }
+
+ resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
return resourceManagerClient;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index fea8a5c..d4cf9ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -38,8 +38,8 @@ import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
+import org.apache.flink.yarn.Flip6YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
-import org.apache.flink.yarn.YarnClusterDescriptorV2;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.cli.CommandLine;
@@ -522,7 +522,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
for (Option option : commandLine.getOptions()) {
if (allOptions.hasOption(option.getOpt())) {
- if (!option.getOpt().equals(detached.getOpt())) {
+ if (!option.getOpt().equals(detached.getOpt()) && !option.getOpt().equals(flip6.getOpt())) {
// don't resume from properties file if yarn options have been specified
canApplyYarnProperties = false;
break;
@@ -653,14 +653,19 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
deleteYarnPropertiesFile();
+ ApplicationReport applicationReport;
+
try {
- final ApplicationReport applicationReport = yarnClusterDescriptor
+ applicationReport = yarnClusterDescriptor
.getYarnClient()
.getApplicationReport(yarnApplicationId);
-
- logFinalApplicationReport(applicationReport);
} catch (YarnException | IOException e) {
LOG.info("Could not log the final application report.", e);
+ applicationReport = null;
+ }
+
+ if (applicationReport != null) {
+ logFinalApplicationReport(applicationReport);
}
}
}
@@ -950,7 +955,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
private static AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
final YarnClient yarnClient = YarnClient.createYarnClient();
if (flip6) {
- return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
+ return new Flip6YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
} else {
return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 256056c..a03ed08 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
@@ -93,6 +94,7 @@ public class YarnEntrypointUtils {
ApplicationConstants.Environment.NM_HOST.key());
configuration.setString(JobManagerOptions.ADDRESS, hostname);
+ configuration.setString(RestOptions.REST_ADDRESS, hostname);
// TODO: Support port ranges for the AM
// final String portRange = configuration.getString(
@@ -112,6 +114,11 @@ public class YarnEntrypointUtils {
configuration.setInteger(WebOptions.PORT, 0);
}
+ if (configuration.getInteger(RestOptions.REST_PORT) >= 0) {
+ // set the REST port to 0 to select it randomly
+ configuration.setInteger(RestOptions.REST_PORT, 0);
+ }
+
// if the user has set the deprecated YARN-specific config keys, we add the
// corresponding generic config keys instead. that way, later code needs not
// deal with deprecated config keys
http://git-wip-us.apache.org/repos/asf/flink/blob/dbe0e828/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
index 4e68612..0859f03 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.yarn;
import org.apache.flink.client.deployment.ClusterRetrieveException;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
@@ -131,5 +132,10 @@ public class AbstractYarnClusterTest extends TestLogger {
protected String getYarnJobClusterEntrypoint() {
throw new UnsupportedOperationException("Not needed for testing");
}
+
+ @Override
+ protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
+ throw new UnsupportedOperationException("Not needed for testing");
+ }
}
}