You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/08/29 16:50:11 UTC

[2/3] flink git commit: [FLINK-4486] detached YarnSession: wait until cluster startup is complete

[FLINK-4486] detached YarnSession: wait until cluster startup is complete

This closes #2423


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cdeb118
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cdeb118
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cdeb118

Branch: refs/heads/release-1.1
Commit: 4cdeb11854956ac6cf1189d7cfa43628fb3be328
Parents: 28da995
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Aug 19 14:57:14 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Aug 29 18:20:00 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  17 ++-
 .../client/program/StandaloneClusterClient.java |   3 +
 .../apache/flink/yarn/YarnClusterClient.java    | 112 ++++++++++---------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   1 +
 4 files changed, 79 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c3c666b..b8d3400 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -386,7 +386,10 @@ public abstract class ClusterClient {
 	 * @throws ProgramInvocationException
 	 */
 	public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		LeaderRetrievalService leaderRetrievalService;
+
+		waitForClusterToBeReady();
+
+		final LeaderRetrievalService leaderRetrievalService;
 		try {
 			leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
 		} catch (Exception e) {
@@ -411,8 +414,10 @@ public abstract class ClusterClient {
 	 * @throws ProgramInvocationException
 	 */
 	public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
-		ActorGateway jobManagerGateway;
 
+		waitForClusterToBeReady();
+
+		final ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
 		} catch (Exception e) {
@@ -655,6 +660,14 @@ public abstract class ClusterClient {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Blocks until the client has determined that the cluster is ready for Job submission.
+	 *
+	 * This is delayed until right before job submission to report any other errors first
+	 * (e.g. invalid job definitions/errors in the user jar)
+	 */
+	public abstract void waitForClusterToBeReady();
+
+	/**
 	 * Returns an URL (as a string) to the JobManager web interface
 	 */
 	public abstract String getWebInterfaceURL();

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index d25c9d1..3343b69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -41,6 +41,9 @@ public class StandaloneClusterClient extends ClusterClient {
 		super(config);
 	}
 
+	@Override
+	public void waitForClusterToBeReady() {}
+
 
 	@Override
 	public String getWebInterfaceURL() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e76b7e8..75bfeed 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -90,7 +90,8 @@ public class YarnClusterClient extends ClusterClient {
 
 	private boolean isConnected = true;
 
-	private final boolean perJobCluster;
+	/** Indicator whether this cluster has just been created */
+	private final boolean newlyCreatedCluster;
 
 	/**
 	 * Create a new Flink on YARN cluster.
@@ -100,7 +101,7 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
 	 * @param sessionFilesDir Location of files required for YARN session
-	 * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown
+	 * @param newlyCreatedCluster Indicator whether this cluster has just been created
 	 * @throws IOException
 	 * @throws YarnException
 	 */
@@ -110,7 +111,7 @@ public class YarnClusterClient extends ClusterClient {
 		final ApplicationReport appReport,
 		org.apache.flink.configuration.Configuration flinkConfig,
 		Path sessionFilesDir,
-		boolean perJobCluster) throws IOException, YarnException {
+		boolean newlyCreatedCluster) throws IOException, YarnException {
 
 		super(flinkConfig);
 
@@ -123,13 +124,13 @@ public class YarnClusterClient extends ClusterClient {
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
-		this.perJobCluster = perJobCluster;
+		this.newlyCreatedCluster = newlyCreatedCluster;
 
-		this.applicationClient = new LazApplicationClientLoader();
+		this.applicationClient = new LazApplicationClientLoader(flinkConfig, actorSystemLoader);
 
-		pollingRunner = new PollingThread(yarnClient, appId);
-		pollingRunner.setDaemon(true);
-		pollingRunner.start();
+		this.pollingRunner = new PollingThread(yarnClient, appId);
+		this.pollingRunner.setDaemon(true);
+		this.pollingRunner.start();
 
 		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 	}
@@ -199,7 +200,7 @@ public class YarnClusterClient extends ClusterClient {
 	@Override
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
 		if (isDetached()) {
-			if (perJobCluster) {
+			if (newlyCreatedCluster) {
 				stopAfterJob(jobGraph.getJobID());
 			}
 			return super.runDetached(jobGraph, classLoader);
@@ -342,8 +343,7 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	@Override
 	public void finalizeCluster() {
-		if (isDetached() || !perJobCluster) {
-			// only disconnect if we are not running a per job cluster
+		if (isDetached() || !newlyCreatedCluster) {
 			disconnect();
 		} else {
 			shutdownCluster();
@@ -369,20 +369,16 @@ public class YarnClusterClient extends ClusterClient {
 			// we are already in the shutdown hook
 		}
 
-		if(actorSystemLoader.isLoaded()){
-			LOG.info("Sending shutdown request to the Application Master");
-			if(applicationClient.get() != ActorRef.noSender()) {
-				try {
-					Future<Object> response =
-						Patterns.ask(applicationClient.get(),
-							new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
-									"Flink YARN Client requested shutdown"),
-							new Timeout(akkaDuration));
-					Await.ready(response, akkaDuration);
-				} catch(Exception e) {
-					LOG.warn("Error while stopping YARN Application Client", e);
-				}
-			}
+		LOG.info("Sending shutdown request to the Application Master");
+		try {
+			Future<Object> response =
+				Patterns.ask(applicationClient.get(),
+					new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+							"Flink YARN Client requested shutdown"),
+					new Timeout(akkaDuration));
+			Await.ready(response, akkaDuration);
+		} catch(Exception e) {
+			LOG.warn("Error while stopping YARN cluster.", e);
 		}
 
 		try {
@@ -518,14 +514,52 @@ public class YarnClusterClient extends ClusterClient {
 		return super.isDetached() || clusterDescriptor.isDetachedMode();
 	}
 
+	/**
+	 * Blocks until all TaskManagers are connected to the JobManager.
+	 */
+	@Override
+	public void waitForClusterToBeReady() {
+		logAndSysout("Waiting until all TaskManagers have connected");
+
+		for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
+			currentStatus = getClusterStatus();
+			if (currentStatus != null && !currentStatus.equals(lastStatus)) {
+				logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
+					+ clusterDescriptor.getTaskManagerCount() + ")");
+				if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
+					logAndSysout("All TaskManagers are connected");
+					break;
+				}
+			} else if (lastStatus == null) {
+				logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
+			}
+
+			try {
+				Thread.sleep(250);
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
+			}
+		}
+	}
+
 	public ApplicationId getApplicationId() {
 		return appId;
 	}
 
-	protected class LazApplicationClientLoader {
+	private static class LazApplicationClientLoader {
+
+		private final org.apache.flink.configuration.Configuration flinkConfig;
+		private final LazyActorSystemLoader actorSystemLoader;
 
 		private ActorRef applicationClient;
 
+		private LazApplicationClientLoader(
+				org.apache.flink.configuration.Configuration flinkConfig,
+				LazyActorSystemLoader actorSystemLoader) {
+			this.flinkConfig = flinkConfig;
+			this.actorSystemLoader = actorSystemLoader;
+		}
+
 		/**
 		 * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
 		 * @return ActorSystem
@@ -549,32 +583,6 @@ public class YarnClusterClient extends ClusterClient {
 						flinkConfig,
 						leaderRetrievalService),
 					"applicationClient");
-
-				if (perJobCluster) {
-
-					logAndSysout("Waiting until all TaskManagers have connected");
-
-					for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) {
-						currentStatus = getClusterStatus();
-						if (currentStatus != null && !currentStatus.equals(lastStatus)) {
-							logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
-								+ clusterDescriptor.getTaskManagerCount() + ")");
-							if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
-								logAndSysout("All TaskManagers are connected");
-								break;
-							}
-						} else if (lastStatus == null) {
-							logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
-						}
-
-						try {
-							Thread.sleep(250);
-						} catch (InterruptedException e) {
-							LOG.error("Interrupted while waiting for TaskManagers");
-							throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
-						}
-					}
-				}
 			}
 
 			return applicationClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..28d8fb8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -628,6 +628,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 						"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
 						"Please also note that the temporary files of the YARN session in {} will not be removed.",
 						yarnDescriptor.getSessionFilesDir());
+				yarnCluster.waitForClusterToBeReady();
 				yarnCluster.disconnect();
 			} else {
 				runInteractiveCli(yarnCluster, acceptInteractiveInput);