You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/06/24 16:56:06 UTC

[2/6] flink git commit: [FLINK-3667] additional cleanups in YarnClusterClient

[FLINK-3667] additional cleanups in YarnClusterClient

- remove ActorRunner thread, print status in finalizeCluster instead
- prevent premature shutdown of actor system in shutdown method
- prevent timeout exceptions due to poisoning the ApplicationClient


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b593632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b593632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b593632

Branch: refs/heads/master
Commit: 3b593632dd162d951281fab8a8ed8c6bc2b07b39
Parents: 6420c1c
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Jun 23 10:47:10 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jun 24 17:00:34 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   2 +-
 .../flink/client/program/ClusterClient.java     |  14 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 135 ++++++-------------
 3 files changed, 51 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a01ab53..5c4791b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -153,7 +153,7 @@ public class CliFrontend {
 		// load the configuration
 		LOG.info("Trying to load configuration file");
 		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-		System.setProperty("FLINK_CONF_DIR", configDirectory.getAbsolutePath());
+		System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
 
 		this.config = GlobalConfiguration.getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index def9578..12a7a39 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -166,12 +166,14 @@ public abstract class ClusterClient {
 	 * Shuts down the client. This stops the internal actor system and actors.
 	 */
 	public void shutdown() {
-		try {
-			finalizeCluster();
-		} finally {
-			if (!this.actorSystem.isTerminated()) {
-				this.actorSystem.shutdown();
-				this.actorSystem.awaitTermination();
+		synchronized (this) {
+			try {
+				finalizeCluster();
+			} finally {
+				if (!this.actorSystem.isTerminated()) {
+					this.actorSystem.shutdown();
+					this.actorSystem.awaitTermination();
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b593632/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index b603294..9c77a8a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -21,7 +21,6 @@ import akka.actor.ActorRef;
 
 import static akka.pattern.Patterns.ask;
 
-import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -72,7 +71,7 @@ public class YarnClusterClient extends ClusterClient {
 	private static final int POLLING_THREAD_INTERVAL_MS = 1000;
 
 	private YarnClient yarnClient;
-	private Thread actorRunner;
+
 	private Thread clientShutdownHook = new ClientShutdownHook();
 	private PollingThread pollingRunner;
 	private final Configuration hadoopConfig;
@@ -144,36 +143,6 @@ public class YarnClusterClient extends ClusterClient {
 				leaderRetrievalService),
 			"applicationClient");
 
-		actorRunner = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				// blocks until ApplicationClient has been stopped
-				actorSystem.awaitTermination();
-
-				// get final application report
-				try {
-					ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-
-					LOG.info("Application " + appId + " finished with state " + appReport
-						.getYarnApplicationState() + " and final state " + appReport
-						.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
-					if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
-						== YarnApplicationState.KILLED) {
-						LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
-						LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
-							+ "the full application log using this command:\n"
-							+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
-							+ "(It sometimes takes a few seconds until the logs are aggregated)");
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while getting final application report", e);
-				}
-			}
-		});
-		actorRunner.setDaemon(true);
-		actorRunner.start();
-
 		pollingRunner = new PollingThread(yarnClient, appId);
 		pollingRunner.setDaemon(true);
 		pollingRunner.start();
@@ -211,10 +180,19 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
+	/**
+	 * Disconnect from the Yarn cluster
+	 */
 	public void disconnect() {
+
+		if (hasBeenShutDown.getAndSet(true)) {
+			return;
+		}
+
 		if(!isConnected) {
 			throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
 		}
+
 		LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
 
 		try {
@@ -223,15 +201,6 @@ public class YarnClusterClient extends ClusterClient {
 			// we are already in the shutdown hook
 		}
 
-		// tell the actor to shut down.
-		applicationClient.tell(PoisonPill.getInstance(), applicationClient);
-
-		try {
-			actorRunner.join(1000); // wait for 1 second
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the actor runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
 		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);
@@ -239,6 +208,7 @@ public class YarnClusterClient extends ClusterClient {
 			LOG.warn("Shutdown of the polling runner was interrupted", e);
 			Thread.currentThread().interrupt();
 		}
+
 		isConnected = false;
 	}
 
@@ -278,23 +248,7 @@ public class YarnClusterClient extends ClusterClient {
 		if (isDetached()) {
 			return super.runDetached(jobGraph, classLoader);
 		} else {
-			try {
-				return super.run(jobGraph, classLoader);
-			} finally {
-				// show cluster status
-				List<String> msgs = getNewMessages();
-				if (msgs != null && msgs.size() > 1) {
-
-					logAndSysout("The following messages were created by the YARN cluster while running the Job:");
-					for (String msg : msgs) {
-						logAndSysout(msg);
-					}
-				}
-				if (getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
-					logAndSysout("YARN cluster is in non-successful state " + getApplicationStatus());
-					logAndSysout("YARN Diagnostics: " + getDiagnostics());
-				}
-			}
+			return super.run(jobGraph, classLoader);
 		}
 	}
 
@@ -369,35 +323,18 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
+	@Override
+	public List<String> getNewMessages() {
 
-	private String getDiagnostics() {
-		if(!isConnected) {
-			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
-		}
-
-		if (getApplicationStatus() == ApplicationStatus.SUCCEEDED) {
-			LOG.warn("getDiagnostics() called for cluster which is not in failed state");
-		}
-		ApplicationReport lastReport = pollingRunner.getLastReport();
-		if (lastReport == null) {
-			LOG.warn("Last report is null");
-			return null;
-		} else {
-			return lastReport.getDiagnostics();
+		if(hasBeenShutdown()) {
+			throw new RuntimeException("The YarnClusterClient has already been stopped");
 		}
-	}
 
-	@Override
-	public List<String> getNewMessages() {
 		if(!isConnected) {
 			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
 		}
 
-		if(hasBeenShutdown()) {
-			throw new RuntimeException("The YarnClusterClient has already been stopped");
-		}
 		List<String> ret = new ArrayList<String>();
-
 		// get messages from ApplicationClient (locally)
 		while(true) {
 			Object result;
@@ -443,7 +380,6 @@ public class YarnClusterClient extends ClusterClient {
 	 */
 	@Override
 	public void finalizeCluster() {
-
 		if (isDetached() || !perJobCluster) {
 			// only disconnect if we are not running a per job cluster
 			disconnect();
@@ -452,14 +388,17 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
+	/**
+	 * Shuts down the Yarn application
+	 */
 	public void shutdownCluster() {
 
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
+		if (hasBeenShutDown.getAndSet(true)) {
+			return;
 		}
 
-		if(hasBeenShutDown.getAndSet(true)) {
-			return;
+		if (!isConnected) {
+			throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
 		}
 
 		try {
@@ -481,9 +420,6 @@ public class YarnClusterClient extends ClusterClient {
 					LOG.warn("Error while stopping YARN Application Client", e);
 				}
 			}
-
-			actorSystem.shutdown();
-			actorSystem.awaitTermination();
 		}
 
 		try {
@@ -513,12 +449,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 
 		try {
-			actorRunner.join(1000); // wait for 1 second
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the actor runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
-		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);
 		} catch(InterruptedException e) {
@@ -526,6 +456,25 @@ public class YarnClusterClient extends ClusterClient {
 			Thread.currentThread().interrupt();
 		}
 
+		try {
+			ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+
+			LOG.info("Application " + appId + " finished with state " + appReport
+				.getYarnApplicationState() + " and final state " + appReport
+				.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+			if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+				== YarnApplicationState.KILLED) {
+				LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
+				LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+					+ "the full application log using this command:\n"
+					+ "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n"
+					+ "(It sometimes takes a few seconds until the logs are aggregated)");
+			}
+		} catch (Exception e) {
+			LOG.warn("Couldn't get final report", e);
+		}
+
 		LOG.info("YARN Client is shutting down");
 		yarnClient.stop(); // actorRunner is using the yarnClient.
 		yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.