You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 16:50:11 UTC
[2/3] flink git commit: [FLINK-4486] detached YarnSession: wait until
cluster startup is complete
[FLINK-4486] detached YarnSession: wait until cluster startup is complete
This closes #2423
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cdeb118
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cdeb118
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cdeb118
Branch: refs/heads/release-1.1
Commit: 4cdeb11854956ac6cf1189d7cfa43628fb3be328
Parents: 28da995
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Aug 19 14:57:14 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 29 18:20:00 2016 +0200
----------------------------------------------------------------------
.../flink/client/program/ClusterClient.java | 17 ++-
.../client/program/StandaloneClusterClient.java | 3 +
.../apache/flink/yarn/YarnClusterClient.java | 112 ++++++++++---------
.../flink/yarn/cli/FlinkYarnSessionCli.java | 1 +
4 files changed, 79 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c3c666b..b8d3400 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -386,7 +386,10 @@ public abstract class ClusterClient {
* @throws ProgramInvocationException
*/
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
- LeaderRetrievalService leaderRetrievalService;
+
+ waitForClusterToBeReady();
+
+ final LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
} catch (Exception e) {
@@ -411,8 +414,10 @@ public abstract class ClusterClient {
* @throws ProgramInvocationException
*/
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
- ActorGateway jobManagerGateway;
+ waitForClusterToBeReady();
+
+ final ActorGateway jobManagerGateway;
try {
jobManagerGateway = getJobManagerGateway();
} catch (Exception e) {
@@ -655,6 +660,14 @@ public abstract class ClusterClient {
// ------------------------------------------------------------------------
/**
+ * Blocks until the client has determined that the cluster is ready for Job submission.
+ *
+ * This is delayed until right before job submission to report any other errors first
+ * (e.g. invalid job definitions/errors in the user jar)
+ */
+ public abstract void waitForClusterToBeReady();
+
+ /**
* Returns an URL (as a string) to the JobManager web interface
*/
public abstract String getWebInterfaceURL();
http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index d25c9d1..3343b69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -41,6 +41,9 @@ public class StandaloneClusterClient extends ClusterClient {
super(config);
}
+ @Override
+ public void waitForClusterToBeReady() {}
+
@Override
public String getWebInterfaceURL() {
http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e76b7e8..75bfeed 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -90,7 +90,8 @@ public class YarnClusterClient extends ClusterClient {
private boolean isConnected = true;
- private final boolean perJobCluster;
+ /** Indicator whether this cluster has just been created */
+ private final boolean newlyCreatedCluster;
/**
* Create a new Flink on YARN cluster.
@@ -100,7 +101,7 @@ public class YarnClusterClient extends ClusterClient {
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
* @param sessionFilesDir Location of files required for YARN session
- * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown
+ * @param newlyCreatedCluster Indicator whether this cluster has just been created
* @throws IOException
* @throws YarnException
*/
@@ -110,7 +111,7 @@ public class YarnClusterClient extends ClusterClient {
final ApplicationReport appReport,
org.apache.flink.configuration.Configuration flinkConfig,
Path sessionFilesDir,
- boolean perJobCluster) throws IOException, YarnException {
+ boolean newlyCreatedCluster) throws IOException, YarnException {
super(flinkConfig);
@@ -123,13 +124,13 @@ public class YarnClusterClient extends ClusterClient {
this.appReport = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
- this.perJobCluster = perJobCluster;
+ this.newlyCreatedCluster = newlyCreatedCluster;
- this.applicationClient = new LazApplicationClientLoader();
+ this.applicationClient = new LazApplicationClientLoader(flinkConfig, actorSystemLoader);
- pollingRunner = new PollingThread(yarnClient, appId);
- pollingRunner.setDaemon(true);
- pollingRunner.start();
+ this.pollingRunner = new PollingThread(yarnClient, appId);
+ this.pollingRunner.setDaemon(true);
+ this.pollingRunner.start();
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
}
@@ -199,7 +200,7 @@ public class YarnClusterClient extends ClusterClient {
@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
if (isDetached()) {
- if (perJobCluster) {
+ if (newlyCreatedCluster) {
stopAfterJob(jobGraph.getJobID());
}
return super.runDetached(jobGraph, classLoader);
@@ -342,8 +343,7 @@ public class YarnClusterClient extends ClusterClient {
*/
@Override
public void finalizeCluster() {
- if (isDetached() || !perJobCluster) {
- // only disconnect if we are not running a per job cluster
+ if (isDetached() || !newlyCreatedCluster) {
disconnect();
} else {
shutdownCluster();
@@ -369,20 +369,16 @@ public class YarnClusterClient extends ClusterClient {
// we are already in the shutdown hook
}
- if(actorSystemLoader.isLoaded()){
- LOG.info("Sending shutdown request to the Application Master");
- if(applicationClient.get() != ActorRef.noSender()) {
- try {
- Future<Object> response =
- Patterns.ask(applicationClient.get(),
- new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
- "Flink YARN Client requested shutdown"),
- new Timeout(akkaDuration));
- Await.ready(response, akkaDuration);
- } catch(Exception e) {
- LOG.warn("Error while stopping YARN Application Client", e);
- }
- }
+ LOG.info("Sending shutdown request to the Application Master");
+ try {
+ Future<Object> response =
+ Patterns.ask(applicationClient.get(),
+ new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+ "Flink YARN Client requested shutdown"),
+ new Timeout(akkaDuration));
+ Await.ready(response, akkaDuration);
+ } catch(Exception e) {
+ LOG.warn("Error while stopping YARN cluster.", e);
}
try {
@@ -518,14 +514,52 @@ public class YarnClusterClient extends ClusterClient {
return super.isDetached() || clusterDescriptor.isDetachedMode();
}
+ /**
+ * Blocks until all TaskManagers are connected to the JobManager.
+ */
+ @Override
+ public void waitForClusterToBeReady() {
+ logAndSysout("Waiting until all TaskManagers have connected");
+
+ for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
+ currentStatus = getClusterStatus();
+ if (currentStatus != null && !currentStatus.equals(lastStatus)) {
+ logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+ + clusterDescriptor.getTaskManagerCount() + ")");
+ if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
+ logAndSysout("All TaskManagers are connected");
+ break;
+ }
+ } else if (lastStatus == null) {
+ logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
+ }
+
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
+ }
+ }
+ }
+
public ApplicationId getApplicationId() {
return appId;
}
- protected class LazApplicationClientLoader {
+ private static class LazApplicationClientLoader {
+
+ private final org.apache.flink.configuration.Configuration flinkConfig;
+ private final LazyActorSystemLoader actorSystemLoader;
private ActorRef applicationClient;
+ private LazApplicationClientLoader(
+ org.apache.flink.configuration.Configuration flinkConfig,
+ LazyActorSystemLoader actorSystemLoader) {
+ this.flinkConfig = flinkConfig;
+ this.actorSystemLoader = actorSystemLoader;
+ }
+
/**
* Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
* @return ActorSystem
@@ -549,32 +583,6 @@ public class YarnClusterClient extends ClusterClient {
flinkConfig,
leaderRetrievalService),
"applicationClient");
-
- if (perJobCluster) {
-
- logAndSysout("Waiting until all TaskManagers have connected");
-
- for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
- currentStatus = getClusterStatus();
- if (currentStatus != null && !currentStatus.equals(lastStatus)) {
- logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
- + clusterDescriptor.getTaskManagerCount() + ")");
- if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
- logAndSysout("All TaskManagers are connected");
- break;
- }
- } else if (lastStatus == null) {
- logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
- }
-
- try {
- Thread.sleep(250);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while waiting for TaskManagers");
- throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
- }
- }
- }
}
return applicationClient;
http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..28d8fb8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -628,6 +628,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
yarnDescriptor.getSessionFilesDir());
+ yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
runInteractiveCli(yarnCluster, acceptInteractiveInput);